You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/08/02 15:55:05 UTC

svn commit: r1803838 - /aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java

Author: csierra
Date: Wed Aug  2 15:55:05 2017
New Revision: 1803838

URL: http://svn.apache.org/viewvc?rev=1803838&view=rev
Log:
Fix generic flatMap

it was not propagating removed instances

Modified:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1803838&r1=1803837&r2=1803838&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Wed Aug  2 15:55:05 2017
@@ -52,7 +52,7 @@ public class OSGiImpl<T> implements OSGi
 	public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
 		return new OSGiImpl<>(
 			((bundleContext) -> {
-				Map<Object, OSGiResult<? extends S>> identities =
+				Map<Object, OSGiResult<?>> identities =
 					new IdentityHashMap<>();
 
 				AtomicReference<Runnable> closeReference =
@@ -66,61 +66,52 @@ public class OSGiImpl<T> implements OSGi
 
 				Consumer<Tuple<S>> removedSource = removed.getSource();
 
-				AtomicReference<Tuple<S>> tupleAtomicReference =
-					new AtomicReference<>();
-
-				OSGiResultImpl<S> osgiResult = new OSGiResultImpl<>(
-					added, removed, null,
+				return new OSGiResultImpl<>(
+					added, removed,
 					() -> {
-						synchronized (identities) {
-							identities.values().forEach(OSGiResult::close);
-						}
+						OSGiResultImpl<T> or1 = _operation.run(bundleContext);
 
-						closeReference.get().run();
-					});
-
-				osgiResult.start = () -> {
-					OSGiResultImpl<T> or1 = _operation.run(bundleContext);
+						closeReference.set(or1.close);
 
-					closeReference.set(or1.close);
+						or1.added.map(t -> {
+							OSGiImpl<S> program =
+								(OSGiImpl<S>)fun.apply(t.t);
 
-					or1.added.map(t -> {
-						OSGi<? extends S> program = fun.apply(t.t);
+							OSGiResultImpl<S> or2 =
+								program._operation.run(bundleContext);
 
-						OSGiResult<? extends S> or2 = program.run(
-							bundleContext,
-							s -> {
-								Tuple<S> tuple = Tuple.create(s);
+							or2.added.map(s -> {addedSource.accept(s); return null;});
+							or2.removed.map(s -> {removedSource.accept(s); return null;});
 
-								tupleAtomicReference.set(tuple);
+							or2.start.run();
 
-								addedSource.accept(tuple);
-							});
+							identities.put(t.original, or2);
 
-						identities.put(t.original, or2);
+							return null;
+						});
 
-						return null;
-					});
+						or1.removed.map(t -> {
+							synchronized (identities) {
+								OSGiResult<?> osgiResult1 =
+									identities.remove(t.original);
 
-					or1.removed.map(t -> {
-						synchronized (identities) {
-							OSGiResult<? extends S> osgiResult1 =
-								identities.remove(t.original);
+								if (osgiResult1 != null) {
+									osgiResult1.close();
+								}
+							}
 
-							removedSource.accept(tupleAtomicReference.get());
+							return null;
+						});
 
-							if (osgiResult1 != null) {
-								osgiResult1.close();
-							}
+						or1.start.run();
+					},
+					() -> {
+						synchronized (identities) {
+							identities.values().forEach(OSGiResult::close);
 						}
 
-						return null;
+						closeReference.get().run();
 					});
-
-					or1.start.run();
-				};
-
-				return osgiResult;
 			}
 			));
 	}