You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/08/02 15:55:05 UTC
svn commit: r1803838 -
/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
Author: csierra
Date: Wed Aug 2 15:55:05 2017
New Revision: 1803838
URL: http://svn.apache.org/viewvc?rev=1803838&view=rev
Log:
Fix generic flatMap
it was not propagating removed instances
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1803838&r1=1803837&r2=1803838&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Wed Aug 2 15:55:05 2017
@@ -52,7 +52,7 @@ public class OSGiImpl<T> implements OSGi
public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
return new OSGiImpl<>(
((bundleContext) -> {
- Map<Object, OSGiResult<? extends S>> identities =
+ Map<Object, OSGiResult<?>> identities =
new IdentityHashMap<>();
AtomicReference<Runnable> closeReference =
@@ -66,61 +66,52 @@ public class OSGiImpl<T> implements OSGi
Consumer<Tuple<S>> removedSource = removed.getSource();
- AtomicReference<Tuple<S>> tupleAtomicReference =
- new AtomicReference<>();
-
- OSGiResultImpl<S> osgiResult = new OSGiResultImpl<>(
- added, removed, null,
+ return new OSGiResultImpl<>(
+ added, removed,
() -> {
- synchronized (identities) {
- identities.values().forEach(OSGiResult::close);
- }
+ OSGiResultImpl<T> or1 = _operation.run(bundleContext);
- closeReference.get().run();
- });
-
- osgiResult.start = () -> {
- OSGiResultImpl<T> or1 = _operation.run(bundleContext);
+ closeReference.set(or1.close);
- closeReference.set(or1.close);
+ or1.added.map(t -> {
+ OSGiImpl<S> program =
+ (OSGiImpl<S>)fun.apply(t.t);
- or1.added.map(t -> {
- OSGi<? extends S> program = fun.apply(t.t);
+ OSGiResultImpl<S> or2 =
+ program._operation.run(bundleContext);
- OSGiResult<? extends S> or2 = program.run(
- bundleContext,
- s -> {
- Tuple<S> tuple = Tuple.create(s);
+ or2.added.map(s -> {addedSource.accept(s); return null;});
+ or2.removed.map(s -> {removedSource.accept(s); return null;});
- tupleAtomicReference.set(tuple);
+ or2.start.run();
- addedSource.accept(tuple);
- });
+ identities.put(t.original, or2);
- identities.put(t.original, or2);
+ return null;
+ });
- return null;
- });
+ or1.removed.map(t -> {
+ synchronized (identities) {
+ OSGiResult<?> osgiResult1 =
+ identities.remove(t.original);
- or1.removed.map(t -> {
- synchronized (identities) {
- OSGiResult<? extends S> osgiResult1 =
- identities.remove(t.original);
+ if (osgiResult1 != null) {
+ osgiResult1.close();
+ }
+ }
- removedSource.accept(tupleAtomicReference.get());
+ return null;
+ });
- if (osgiResult1 != null) {
- osgiResult1.close();
- }
+ or1.start.run();
+ },
+ () -> {
+ synchronized (identities) {
+ identities.values().forEach(OSGiResult::close);
}
- return null;
+ closeReference.get().run();
});
-
- or1.start.run();
- };
-
- return osgiResult;
}
));
}