You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/10/10 15:51:24 UTC

svn commit: r1811723 - in /aries/trunk/component-dsl: component-dsl/src/main/java/org/apache/aries/osgi/functional/ component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ itests/src/main/java/org/apache/aries/osgi/functional/test/

Author: csierra
Date: Tue Oct 10 15:51:22 2017
New Revision: 1811723

URL: http://svn.apache.org/viewvc?rev=1811723&view=rev
Log:
[Component-DSL] refactor to remove Pipe

Pipe is not needed.

Removed:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
Modified:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
    aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
    aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java Tue Oct 10 15:51:22 2017
@@ -24,6 +24,6 @@ import org.osgi.framework.BundleContext;
  */
 public interface OSGiOperation<T> {
 
-	OSGiResult<T> run(BundleContext bundleContext);
+	OSGiResult run(BundleContext bundleContext);
 
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java Tue Oct 10 15:51:22 2017
@@ -20,8 +20,10 @@ package org.apache.aries.osgi.functional
 /**
  * @author Carlos Sierra Andrés
  */
-public interface OSGiResult<T> extends AutoCloseable {
+public interface OSGiResult extends AutoCloseable {
 
 	@Override
 	public void close();
+
+	public void start();
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java Tue Oct 10 15:51:22 2017
@@ -25,7 +25,7 @@ import java.util.function.Consumer;
  * @author Carlos Sierra Andrés
  */
 public interface OSGiRunnable<T> {
-	OSGiResult<T> run(BundleContext bundleContext);
+	OSGiResult run(BundleContext bundleContext);
 
-	OSGiResult<T> run(BundleContext bundleContext, Consumer<T> andThen);
+	OSGiResult run(BundleContext bundleContext, Consumer<T> andThen);
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -25,7 +25,7 @@ import org.osgi.framework.BundleContext;
 public class BundleContextOSGiImpl extends OSGiImpl<BundleContext> {
 
 	public BundleContextOSGiImpl() {
-		super(bundleContext ->
-			new JustOSGiImpl<>(bundleContext)._operation.run(bundleContext));
+		super((bundleContext, op) ->
+			new JustOSGiImpl<>(bundleContext)._operation.run(bundleContext, op));
 	}
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java Tue Oct 10 15:51:22 2017
@@ -35,11 +35,7 @@ public class BundleOSGi extends OSGiImpl
 	private final int _stateMask;
 
 	public BundleOSGi(int stateMask) {
-		super(bundleContext -> {
-			Pipe<Bundle, Bundle> added = Pipe.create();
-
-			Consumer<Tuple<Bundle>> addedSource = added.getSource();
-
+		super((bundleContext, op) -> {
 			BundleTracker<Tuple<Bundle>> bundleTracker =
 				new BundleTracker<>(
 					bundleContext, stateMask,
@@ -51,7 +47,7 @@ public class BundleOSGi extends OSGiImpl
 
 							Tuple<Bundle> tuple = Tuple.create(bundle);
 
-							addedSource.accept(tuple);
+							op.accept(tuple);
 
 							return tuple;
 						}
@@ -75,8 +71,8 @@ public class BundleOSGi extends OSGiImpl
 						}
 					});
 
-			return new OSGiResultImpl<>(
-				added, bundleTracker::open, bundleTracker::close);
+			return new OSGiResultImpl(
+				bundleTracker::open, bundleTracker::close);
 		});
 
 		_stateMask = stateMask;
@@ -86,27 +82,23 @@ public class BundleOSGi extends OSGiImpl
 	public <S> OSGiImpl<S> flatMap(
 		Function<? super Bundle, OSGi<? extends S>> fun) {
 
-		return new OSGiImpl<>(bundleContext -> {
-			Pipe<S, S> added = Pipe.create();
-
-			Consumer<Tuple<S>> addedSource = added.getSource();
-
-			BundleTracker<OSGiResult<S>> bundleTracker =
+		return new OSGiImpl<>((bundleContext, op) -> {
+			BundleTracker<OSGiResult> bundleTracker =
 				new BundleTracker<>(
 					bundleContext, _stateMask,
-					new BundleTrackerCustomizer<OSGiResult<S>>() {
+					new BundleTrackerCustomizer<OSGiResult>() {
 
 						@Override
-						public OSGiResult<S> addingBundle(
+						public OSGiResult addingBundle(
 							Bundle bundle, BundleEvent bundleEvent) {
 
 							OSGiImpl<S> program = (OSGiImpl<S>) fun.apply(
 								bundle);
 
-							OSGiResultImpl<S> result =
-								program._operation.run(bundleContext);
+							OSGiResultImpl result = program._operation.run(
+								bundleContext, op);
 
-							result.pipeTo(addedSource);
+							result.start();
 
 							return result;
 						}
@@ -114,7 +106,7 @@ public class BundleOSGi extends OSGiImpl
 						@Override
 						public void modifiedBundle(
 							Bundle bundle, BundleEvent bundleEvent,
-							OSGiResult<S> result) {
+							OSGiResult result) {
 
 							removedBundle(bundle, bundleEvent, result);
 
@@ -124,14 +116,14 @@ public class BundleOSGi extends OSGiImpl
 						@Override
 						public void removedBundle(
 							Bundle bundle, BundleEvent bundleEvent,
-							OSGiResult<S> result) {
+							OSGiResult result) {
 
 							result.close();
 						}
 					});
 
-			return new OSGiResultImpl<>(
-				added, bundleTracker::open, bundleTracker::close);
+			return new OSGiResultImpl(
+				bundleTracker::open, bundleTracker::close);
 
 		});
 	}

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -28,6 +28,7 @@ public class ChangeContextOSGiImpl<T> ex
 	public ChangeContextOSGiImpl(
 		OSGi<T> program, BundleContext bundleContext) {
 
-		super(b -> ((OSGiImpl<T>) program)._operation.run(bundleContext));
+		super((b, op) ->
+			((OSGiImpl<T>) program)._operation.run(bundleContext, op));
 	}
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -23,7 +23,6 @@ import org.osgi.service.cm.ManagedServic
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 
 /**
  * @author Carlos Sierra Andrés
@@ -32,7 +31,7 @@ public class ConfigurationOSGiImpl
 	extends OSGiImpl<Dictionary<String, ?>> {
 
 	public ConfigurationOSGiImpl(String pid) {
-		super(bundleContext -> {
+		super((bundleContext, op) -> {
 			AtomicReference<Dictionary<String, ?>> atomicReference =
 				new AtomicReference<>(null);
 
@@ -43,26 +42,20 @@ public class ConfigurationOSGiImpl
 			AtomicReference<ServiceRegistration<ManagedService>>
 				serviceRegistrationReferece = new AtomicReference<>(null);
 
-			Pipe<Dictionary<String, ?>, Dictionary<String, ?>> added =
-				Pipe.create();
-
-			Consumer<Tuple<Dictionary<String, ?>>> addedSource =
-				added.getSource();
-
 			Runnable start = () ->
 				serviceRegistrationReferece.set(
 					bundleContext.registerService(
 						ManagedService.class,
 						properties -> {
 							while (!atomicReference.compareAndSet(
-								tupleAtomicReference.get().t,
+								tupleAtomicReference.get()._t,
 								properties)) {
 							}
 
 							Tuple<Dictionary<String, ?>> old =
 								tupleAtomicReference.get();
 
-							if (old.t != null) {
+							if (old._t != null) {
 								old.terminate();
 							}
 
@@ -70,7 +63,7 @@ public class ConfigurationOSGiImpl
 								Tuple.create(properties);
 
 							if (properties != null) {
-								addedSource.accept(tuple);
+								op.accept(tuple);
 							}
 
 							tupleAtomicReference.set(tuple);
@@ -79,8 +72,8 @@ public class ConfigurationOSGiImpl
 							put("service.pid", pid);
 						}}));
 
-			return new OSGiResultImpl<>(
-				added, start,
+			return new OSGiResultImpl(
+				start,
 				() -> {
 					serviceRegistrationReferece.get().unregister();
 

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -35,32 +35,25 @@ public class ConfigurationsOSGiImpl
 	extends OSGiImpl<Dictionary<String, ?>> {
 
 	public ConfigurationsOSGiImpl(String factoryPid) {
-		super(bundleContext -> {
+		super((bundleContext, op) -> {
 			Map<String, Tuple<Dictionary<String, ?>>> results =
 				new ConcurrentHashMap<>();
 
 			AtomicReference<ServiceRegistration<ManagedServiceFactory>>
 				serviceRegistrationReference = new AtomicReference<>(null);
 
-			Pipe<Dictionary<String, ?>, Dictionary<String, ?>>
-				added = Pipe.create();
-
-			Consumer<Tuple<Dictionary<String, ?>>> addedSource =
-				added.getSource();
-
 			Runnable start = () ->
 				serviceRegistrationReference.set(
 					bundleContext.registerService(
 						ManagedServiceFactory.class,
-						new ConfigurationsManagedServiceFactory(
-							results, addedSource),
+						new ConfigurationsManagedServiceFactory(results, op),
 						new Hashtable<String, Object>() {{
 							put("service.pid", factoryPid);
 						}}));
 
 
-			return new OSGiResultImpl<>(
-				added, start,
+			return new OSGiResultImpl(
+				start,
 				() -> {
 					serviceRegistrationReference.get().unregister();
 

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java Tue Oct 10 15:51:22 2017
@@ -33,30 +33,21 @@ public class DistributeOSGi<T> extends O
 
     @SafeVarargs
     public DistributeOSGi(OSGi<T>... programs) {
-        super(bundleContext -> {
-            Pipe<T, T> added = Pipe.create();
+        super((bundleContext, op) -> {
+            List<OSGiResult> results = new ArrayList<>();
 
-            Consumer<Tuple<T>> addedSource = added.getSource();
-
-            List<OSGiResult<T>> results = new ArrayList<>();
-
-            return new OSGiResultImpl<>(
-                added,
-                () ->
+            return new OSGiResultImpl(
+                () -> {
                     results.addAll(
                         Arrays.stream(programs).
-                            map(o -> {
-                                OSGiResultImpl<T> osGiResult =
-                                    ((OSGiImpl<T>) o)._operation.run(
-                                        bundleContext);
-
-                                osGiResult.pipeTo(addedSource);
+                            map(o -> ((OSGiImpl<T>) o)._operation.run(
+                                bundleContext, op)).
+                            collect(Collectors.toList()));
 
-                                return osGiResult;
-                            }).
-                            collect(Collectors.toList())),
+                    results.forEach(OSGiResult::start);
+                },
                 () -> {
-                    for (OSGiResult<?> result : results) {
+                    for (OSGiResult result : results) {
                         try {
                             result.close();
                         }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java Tue Oct 10 15:51:22 2017
@@ -18,12 +18,7 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -34,40 +29,19 @@ public class FlatMapImpl<T, S> extends O
 	public FlatMapImpl(
 		OSGiImpl<T> previous, Function<? super T, OSGi<? extends S>> fun) {
 
-		super((bundleContext) -> {
-			AtomicReference<Runnable> closeReference =
-				new AtomicReference<>(NOOP);
+		super((bundleContext, op) ->
+			previous._operation.run(
+				bundleContext,
+				t -> {
+					OSGiImpl<S> program = (OSGiImpl<S>) fun.apply(t._t);
 
-			Pipe<S, S> added = Pipe.create();
+					OSGiResultImpl result =
+						program._operation.run(bundleContext, op);
 
-			Consumer<Tuple<S>> addedSource = added.getSource();
+					t.onTermination(result::close);
 
-			return new OSGiResultImpl<>(
-				added,
-				() -> {
-					OSGiResultImpl<T> or1 = previous._operation.run(
-						bundleContext);
-
-					closeReference.set(or1.close);
-
-					or1.added.map(t -> {
-						OSGiImpl<S> program = (OSGiImpl<S>)fun.apply((T)t.t);
-
-						OSGiResultImpl<S> or2 =
-							program._operation.run(bundleContext);
-
-						t.onTermination(or2::close);
-
-						or2.pipeTo(addedSource);
-
-						return null;
-					});
-
-					or1.start.run();
-				},
-				() -> closeReference.get().run());
-			}
-		);
+					result.start();
+				}));
 	}
 
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -18,85 +18,37 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * @author Carlos Sierra Andrés
  */
 public class JustOSGiImpl<T> extends OSGiImpl<T> {
 
-	private Supplier<Collection<T>> _t;
-
 	public JustOSGiImpl(Collection<T> t) {
 		this(() -> t);
 	}
 
 	public JustOSGiImpl(Supplier<Collection<T>> t) {
-		super(((bundleContext) -> {
-
-			Pipe<T, T> added = Pipe.create();
+		super((bundleContext, op) -> {
 
-			AtomicReference<Collection<Tuple<T>>> collectionAtomicReference =
-				new AtomicReference<>();
-
-			return new OSGiResultImpl<>(
-				added,
-				() -> {
-					List<Tuple<T>> tuples =
-						t.get().stream().map(Tuple::create).collect(
-							Collectors.toList());
-
-					collectionAtomicReference.set(tuples);
-
-					tuples.forEach(tuple ->
-						added.getSource().accept(tuple));
-				},
-				() ->
-					collectionAtomicReference.get().forEach(Tuple::terminate));
-		}));
+			Collection<Tuple<T>> references =
+				t.get().stream().map(Tuple::create).collect(
+					Collectors.toList());
+
+			return new OSGiResultImpl(
+				() -> references.forEach(op),
+				() -> references.forEach(Tuple::terminate));
+		});
 
-		_t = t;
 	}
 
 	public JustOSGiImpl(T t) {
 		this(() -> Collections.singletonList(t));
 	}
 
-	@Override
-	public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
-		return new OSGiImpl<>(bundleContext -> {
-			Pipe<S, S> added = Pipe.create();
-
-			AtomicReference<Runnable> atomicReference = new AtomicReference<>(
-				NOOP);
-
-			return new OSGiResultImpl<>(
-				added,
-				() -> {
-					List<OSGiResultImpl<S>> results = _t.get().stream().map(
-						p -> (OSGiImpl<S>) fun.apply(p)
-					).map(
-						n -> n._operation.run(bundleContext)
-					).collect(Collectors.toList());
-
-					atomicReference.set(
-						() -> results.forEach(OSGiResult::close));
-
-					results.forEach(result -> result.pipeTo(added.getSource()));
-				},
-				() -> atomicReference.get().run());
-		});
-	}
-
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -25,7 +25,6 @@ import org.apache.aries.osgi.functional.
 public class NothingOSGiImpl<S> extends OSGiImpl<S> {
 
 	public NothingOSGiImpl() {
-		super(((bundleContext) -> new OSGiResultImpl<>(
-			Pipe.create(), OSGi.NOOP, OSGi.NOOP)));
+		super((bundleContext, __) -> new OSGiResultImpl(OSGi.NOOP, OSGi.NOOP));
 	}
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -54,46 +54,41 @@ public class OSGiImpl<T> implements OSGi
 	public OSGi<Void> foreach(
 		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
-		return new OSGiImpl<>(((bundleContext) -> {
-			OSGiResultImpl<T> osgiResult = _operation.run(bundleContext);
+		return new OSGiImpl<>((bundleContext, op) ->
+			_operation.run(
+				bundleContext,
+            	t -> {
+                	t.onTermination(() -> onRemoved.accept(t._t));
 
-			return new OSGiResultImpl<>(
-				osgiResult.added.map(
-					t -> {
-						t.onTermination(() -> onRemoved.accept(t.t));
-
-						return t.map(o -> {onAdded.accept(o); return null;});
-					}),
-				osgiResult.start, osgiResult.close);
-		}));
+                	onAdded.accept(t._t);
+
+					Tuple<Void> tuple = Tuple.create(null);
+
+					t.addRelatedTuple(tuple);
+
+					op.accept(tuple);
+            	}));
 	}
 
 	@Override
 	public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
-		return new OSGiImpl<>(((bundleContext) -> {
-			OSGiResultImpl<T> osgiResult = _operation.run(bundleContext);
-
-			return new OSGiResultImpl<>(
-				osgiResult.added.map(t -> t.map(function)),
-				osgiResult.start, osgiResult.close);
-		}));
+		return new OSGiImpl<>((bundleContext, op) ->
+			_operation.run(bundleContext, t -> op.accept(t.map(function))));
 	}
 
 	@Override
-	public OSGiResult<T> run(BundleContext bundleContext) {
+	public OSGiResult run(BundleContext bundleContext) {
 		return run(bundleContext, x -> {});
 	}
 
 	@Override
-	public OSGiResult<T> run(BundleContext bundleContext, Consumer<T> andThen) {
-		OSGiResultImpl<T> osgiResult = _operation.run(bundleContext);
+	public OSGiResult run(BundleContext bundleContext, Consumer<T> andThen) {
+		OSGiResultImpl osgiResult =
+			_operation.run(bundleContext, t -> andThen.accept(t._t));
 
-		osgiResult.added.map(x -> {andThen.accept(x.t); return x;});
+		osgiResult.start();
 
-		osgiResult.start.run();
-
-		return new OSGiResultImpl<>(
-			osgiResult.added, osgiResult.start, osgiResult.close);
+		return osgiResult;
 	}
 
 	@Override
@@ -167,47 +162,14 @@ public class OSGiImpl<T> implements OSGi
 		return new RouteOsgiImpl<>(this, routerConsumer);
 	}
 
-	private static class Pair<X, Y> {
-		private final X _first;
-		private final Y _second;
-
-		public Pair(X first, Y second) {
-			_first = first;
-			_second = second;
-		}
-
-		public X getFirst() {
-			return _first;
-		}
-
-		public Y getSecond() {
-			return _second;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) return true;
-			if (o == null || getClass() != o.getClass()) return false;
-
-			Pair<?, ?> pair = (Pair<?, ?>) o;
-
-			return _first.equals(pair._first);
-		}
-
-		@Override
-		public int hashCode() {
-			return _first.hashCode();
-		}
-	}
-
 	@Override
 	public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
 		return new OSGiImpl<>(
-			((bundleContext) -> {
-				AtomicReference<OSGiResult<?>> myCloseReference =
+			((bundleContext, op) -> {
+				AtomicReference<OSGiResult> myCloseReference =
 					new AtomicReference<>();
 
-				AtomicReference<OSGiResult<?>> otherCloseReference =
+				AtomicReference<OSGiResult> otherCloseReference =
 					new AtomicReference<>();
 
 				DoublyLinkedList<Tuple<T>> identities =
@@ -216,52 +178,44 @@ public class OSGiImpl<T> implements OSGi
 				DoublyLinkedList<Tuple<Function<T, S>>> funs =
 					new DoublyLinkedList<>();
 
-				Pipe<S, S> added = Pipe.create();
-
-				Consumer<Tuple<S>> addedSource = added.getSource();
-
-				return new OSGiResultImpl<>(
-					added,
+				return new OSGiResultImpl(
 					() -> {
-						OSGiResultImpl<T> or1 = _operation.run(bundleContext);
+						OSGiResultImpl or1 = _operation.run(
+							bundleContext,
+							t -> {
+								synchronized (identities) {
+									Node<Tuple<T>> node = identities.addLast(t);
+
+									t.onTermination(node::remove);
+
+									funs.forEach(
+										f -> processAdded(op, f, t));
+								}
+							}
+						);
 
 						myCloseReference.set(or1);
 
-						or1.added.map(t -> {
-							synchronized (identities) {
-								Node<Tuple<T>> node = identities.addLast(t);
-
-								t.onTermination(node::remove);
-
-								funs.forEach(f -> processAdded(addedSource, f, t));
-
-								return null;
-							}
-						});
-
-						OSGiResultImpl<Function<T, S>> funRun =
+						OSGiResultImpl funRun =
 							((OSGiImpl<Function<T, S>>) fun)._operation.run(
-								bundleContext);
+								bundleContext,
+								f -> {
+									synchronized (identities) {
+										Node<Tuple<Function<T, S>>> node =
+											funs.addLast(f);
+
+										f.onTermination(node::remove);
+
+										identities.forEach(
+											t -> processAdded(op, f, t));
+									}
+								});
 
 						otherCloseReference.set(funRun);
 
-						funRun.added.map(f -> {
-							synchronized (identities) {
-								Node<Tuple<Function<T, S>>> node =
-									funs.addLast(f);
-
-								f.onTermination(node::remove);
-
-								identities.forEach(
-									t -> processAdded(addedSource, f, t));
-
-								return null;
-							}
-						});
-
-						or1.start.run();
+						or1.start();
 
-						funRun.start.run();
+						funRun.start();
 					},
 					() -> {
 						synchronized (identities) {

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java Tue Oct 10 15:51:22 2017
@@ -18,14 +18,22 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGiOperation;
+import org.apache.aries.osgi.functional.OSGiResult;
 import org.osgi.framework.BundleContext;
 
+import java.util.function.Consumer;
+
 /**
  * @author Carlos Sierra Andrés
  */
 interface OSGiOperationImpl<T> extends OSGiOperation<T> {
 
+	OSGiResultImpl run(
+		BundleContext bundleContext, Consumer<Tuple<T>> consumer);
+
 	@Override
-	OSGiResultImpl<T> run(BundleContext bundleContext);
+	default OSGiResult run(BundleContext bundleContext) {
+		return run(bundleContext, (__) -> {});
+	}
 
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Tue Oct 10 15:51:22 2017
@@ -20,33 +20,29 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.OSGiResult;
 
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * @author Carlos Sierra Andrés
  */
-public class OSGiResultImpl<T> implements OSGiResult<T> {
+public class OSGiResultImpl implements OSGiResult {
 
-	public Pipe<?, T> added;
 	public Runnable start;
 	public Runnable close;
 
-	public OSGiResultImpl(
-		Pipe<?, T> added, Runnable start, Runnable close) {
-
-		this.added = added;
+	public OSGiResultImpl(Runnable start, Runnable close) {
 		this.start = start;
 		this.close = close;
 	}
 
 	@Override
-	public void close() {
-		close.run();
+	public void start() {
+		start.run();
 	}
 
-	public void pipeTo(Consumer<Tuple<T>> addedSource) {
-		added.map(t -> {addedSource.accept(t); return null;});
-
-		start.run();
+	@Override
+	public void close() {
+		close.run();
 	}
 
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -23,14 +23,11 @@ package org.apache.aries.osgi.functional
 public class OnCloseOSGiImpl extends OSGiImpl<Void> {
 
 	public OnCloseOSGiImpl(Runnable action) {
-		super(bundleContext -> {
-			Pipe<Void, Void> added = Pipe.create();
-
+		super((bundleContext, op) -> {
 			Tuple<Void> tuple = Tuple.create(null);
 
-			return new OSGiResultImpl<>(
-				added,
-				() -> added.getSource().accept(tuple),
+			return new OSGiResultImpl(
+				() -> op.accept(tuple),
 				() -> {
 					action.run();
 

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java Tue Oct 10 15:51:22 2017
@@ -27,32 +27,21 @@ public class RouteOsgiImpl<T> extends OS
     public RouteOsgiImpl(
         OSGiImpl<T> previous, Consumer<Router<T>> routerConsumer) {
 
-        super(((bundleContext) -> {
-
-            Pipe<T, T> outgoingAddingPipe = Pipe.create();
-
-            Consumer<Tuple<T>> outgoingAddingSource =
-                outgoingAddingPipe.getSource();
-
+        super((bundleContext, op) -> {
             final RouterImpl<T> router =
-                new RouterImpl<>(outgoingAddingSource);
+                new RouterImpl<>(op);
 
             routerConsumer.accept(router);
 
-            OSGiResultImpl<T> osgiResult = previous._operation.run(
-                bundleContext);
-
-            osgiResult.added.map(
+            OSGiResultImpl osgiResult = previous._operation.run(
+                bundleContext,
                 t -> {
                     router._adding.accept(t);
 
                     t.onTermination(() -> router._leaving.accept(t));
-
-                    return null;
                 });
 
-            return new OSGiResultImpl<>(
-                outgoingAddingPipe,
+            return new OSGiResultImpl(
                 () -> {
                     router._start.run();
                     osgiResult.start.run();
@@ -61,7 +50,7 @@ public class RouteOsgiImpl<T> extends OS
                     router._close.run();
                     osgiResult.close.run();
                 });
-        }));
+        });
     }
 
     static class RouterImpl<T> implements Router<T> {
@@ -94,7 +83,7 @@ public class RouteOsgiImpl<T> extends OS
         public SentEvent<T> signalAdd(Event<T> event) {
             Tuple<T> tuple = (Tuple<T>) event;
 
-            Tuple<T> copy = Tuple.create(tuple.t);
+            Tuple<T> copy = Tuple.create(tuple._t);
 
             tuple.addRelatedTuple(copy);
 

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java Tue Oct 10 15:51:22 2017
@@ -37,18 +37,15 @@ public class ServiceReferenceOSGi<T> ext
 	private Class<T> _clazz;
 
 	public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
-		super(bundleContext -> {
-			Pipe<ServiceReference<T>, ServiceReference<T>>
-				added = Pipe.create();
-
+		super((bundleContext, op) -> {
 			ServiceTracker<T, AtomicReference<Tuple<ServiceReference<T>>>>
 				serviceTracker = new ServiceTracker<>(
 					bundleContext,
 					buildFilter(bundleContext, filterString, clazz),
-					new DefaultServiceTrackerCustomizer<>(added.getSource()));
+					new DefaultServiceTrackerCustomizer<>(op));
 
-			return new OSGiResultImpl<>(
-				added, serviceTracker::open, serviceTracker::close);
+			return new OSGiResultImpl(
+				serviceTracker::open, serviceTracker::close);
 		});
 
 		_filterString = filterString;
@@ -59,19 +56,17 @@ public class ServiceReferenceOSGi<T> ext
 	public <S> OSGiImpl<S> flatMap(
 		Function<? super ServiceReference<T>, OSGi<? extends S>> fun) {
 
-		return new OSGiImpl<>(bundleContext -> {
-			Pipe<S, S> added = Pipe.create();
-
+		return new OSGiImpl<>((bundleContext, op) -> {
 			ServiceTracker<T, ?> serviceTracker =
 				new ServiceTracker<>(
 					bundleContext,
 					buildFilter(
 						bundleContext, _filterString, _clazz),
 						new FlatMapServiceTrackerCustomizer<>(
-							fun, bundleContext, added.getSource()));
+							fun, bundleContext, op));
 
-			return new OSGiResultImpl<>(
-				added, serviceTracker::open, serviceTracker::close);
+			return new OSGiResultImpl(
+				serviceTracker::open, serviceTracker::close);
 		});
 	}
 
@@ -121,45 +116,44 @@ public class ServiceReferenceOSGi<T> ext
 	}
 
 	private static class FlatMapServiceTrackerCustomizer<T, S>
-		implements ServiceTrackerCustomizer<T, AtomicReference<OSGiResult<S>>> {
+		implements ServiceTrackerCustomizer<T, AtomicReference<OSGiResult>> {
 		private final Function<? super ServiceReference<T>, OSGi<? extends S>>
 			_fun;
 		private final BundleContext _bundleContext;
-		private final Consumer<Tuple<S>> _addedSource;
+		private final Consumer<Tuple<S>> _op;
 
 		FlatMapServiceTrackerCustomizer(
 			Function<? super ServiceReference<T>, OSGi<? extends S>> fun,
-			BundleContext bundleContext, Consumer<Tuple<S>> addedSource) {
+			BundleContext bundleContext, Consumer<Tuple<S>> op) {
 
 			_fun = fun;
 			_bundleContext = bundleContext;
-			_addedSource = addedSource;
+			_op = op;
 		}
 
 		@Override
-        public AtomicReference<OSGiResult<S>> addingService(
+        public AtomicReference<OSGiResult> addingService(
         	ServiceReference<T> reference) {
 
-			OSGiResultImpl<S> osgiResult = doFlatMap(reference);
+			OSGiResultImpl osgiResult = doFlatMap(reference);
 
 			return new AtomicReference<>(osgiResult);
         }
 
-		private OSGiResultImpl<S> doFlatMap(ServiceReference<T> reference) {
+		private OSGiResultImpl doFlatMap(ServiceReference<T> reference) {
 			OSGiImpl<S> program = (OSGiImpl<S>) _fun.apply(reference);
 
-			OSGiResultImpl<S> osgiResult = program._operation.run(
-				_bundleContext);
+			OSGiResultImpl result = program._operation.run(_bundleContext, _op);
 
-			osgiResult.pipeTo(_addedSource);
+			result.start();
 
-			return osgiResult;
+			return result;
 		}
 
 		@Override
         public void modifiedService(
         	ServiceReference<T> reference,
-			AtomicReference<OSGiResult<S>> tracked) {
+			AtomicReference<OSGiResult> tracked) {
 
 			tracked.get().close();
 
@@ -169,7 +163,7 @@ public class ServiceReferenceOSGi<T> ext
 		@Override
         public void removedService(
             ServiceReference<T> reference,
-			AtomicReference<OSGiResult<S>> tracked) {
+			AtomicReference<OSGiResult> tracked) {
 
             tracked.get().close();
         }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java Tue Oct 10 15:51:22 2017
@@ -33,12 +33,12 @@ public class ServiceRegistrationOSGiImpl
 	public ServiceRegistrationOSGiImpl(
 		Class<T> clazz, T service, Map<String, Object> properties) {
 
-		super(bundleContext -> {
-			ServiceRegistration<T> serviceRegistration =
+		super((bundleContext, op) -> {
+			ServiceRegistration<?> serviceRegistration =
 				bundleContext.registerService(
 					clazz, service, getProperties(properties));
 
-			return getServiceRegistrationOSGiResult(serviceRegistration);
+			return getServiceRegistrationOSGiResult(serviceRegistration, op);
 		});
 	}
 
@@ -46,25 +46,24 @@ public class ServiceRegistrationOSGiImpl
 		Class<T> clazz, ServiceFactory<T> serviceFactory,
 		Map<String, Object> properties) {
 
-		super(bundleContext -> {
-			ServiceRegistration<T> serviceRegistration =
+		super((bundleContext, op) -> {
+			ServiceRegistration<?> serviceRegistration =
 				bundleContext.registerService(
 					clazz, serviceFactory, getProperties(properties));
 
-			return getServiceRegistrationOSGiResult(serviceRegistration);
+			return getServiceRegistrationOSGiResult(serviceRegistration, op);
 		});
 	}
 
 	public ServiceRegistrationOSGiImpl(
 		String[] clazz, Object service, Map<String, ?> properties) {
 
-		super(bundleContext -> {
+		super((bundleContext, op) -> {
 			ServiceRegistration<?> serviceRegistration =
 				bundleContext.registerService(
 					clazz, service, new Hashtable<>(properties));
 
-			return getServiceRegistrationOSGiResult(
-				(ServiceRegistration)serviceRegistration);
+			return getServiceRegistrationOSGiResult(serviceRegistration, op);
 		});
 	}
 
@@ -78,22 +77,15 @@ public class ServiceRegistrationOSGiImpl
 		return new Hashtable<>(properties);
 	}
 
-	private static <T> OSGiResultImpl<ServiceRegistration<T>>
+	private static <T> OSGiResultImpl
 		getServiceRegistrationOSGiResult(
-			ServiceRegistration<T> serviceRegistration) {
-
-		Pipe<ServiceRegistration<T>, ServiceRegistration<T>> added =
-			Pipe.create();
-
-		Consumer<Tuple<ServiceRegistration<T>>> addedSource =
-            added.getSource();
+		ServiceRegistration<?> serviceRegistration,
+		Consumer<Tuple<ServiceRegistration<T>>> op) {
 
-		Tuple<ServiceRegistration<T>> tuple = Tuple.create(
-            serviceRegistration);
+		Tuple<ServiceRegistration<?>> tuple = Tuple.create(serviceRegistration);
 
-		return new OSGiResultImpl<>(
-            added,
-            () -> addedSource.accept(tuple),
+		return new OSGiResultImpl(
+            () -> ((Consumer)op).accept(tuple),
             () -> {
                 try {
                     serviceRegistration.unregister();

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java Tue Oct 10 15:51:22 2017
@@ -25,7 +25,6 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -33,21 +32,23 @@ import java.util.function.Function;
  */
 class Tuple<T> implements Event<T>, SentEvent<T> {
 
-	public T t;
-	private Deque<Runnable> _closingHandlers = new LinkedList<>();
-	private DoublyLinkedList<Tuple<?>> _relatedTuples =
-		new DoublyLinkedList<>();
-	private AtomicBoolean closed = new AtomicBoolean(false);
+	public final T _t;
+	private final Deque<Runnable> _closingHandlers;
+	private final DoublyLinkedList<Tuple<?>> _relatedTuples;
+	private final AtomicBoolean closed = new AtomicBoolean(false);
 	private Event<T> cause = this;
 
 	private Tuple(T t) {
-		this(t, new LinkedList<>());
+		this(t, new LinkedList<>(), new DoublyLinkedList<>());
 	}
 
-	private Tuple(T t, Deque<Runnable> closingHandlers) {
-		this.t = t;
+	private Tuple(
+		T t, Deque<Runnable> closingHandlers,
+		DoublyLinkedList<Tuple<?>> relatedTuples) {
 
+		_t = t;
 		_closingHandlers = closingHandlers;
+		_relatedTuples = relatedTuples;
 	}
 
 	public void addRelatedTuple(Tuple<?> tuple) {
@@ -67,12 +68,12 @@ class Tuple<T> implements Event<T>, Sent
 
 	@Override
 	public T getContent() {
-		return t;
+		return _t;
 	}
 
 	@Override
 	public int hashCode() {
-		return t.hashCode();
+		return _t.hashCode();
 	}
 
 	@Override
@@ -85,7 +86,7 @@ class Tuple<T> implements Event<T>, Sent
 	}
 
 	public <S> Tuple<S> map(Function<? super T, ? extends S> fun) {
-		return new Tuple<>(fun.apply(t), _closingHandlers);
+		return new Tuple<>(fun.apply(_t), _closingHandlers, _relatedTuples);
 	}
 
 	public void onTermination(Runnable terminator) {

Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java Tue Oct 10 15:51:22 2017
@@ -22,6 +22,7 @@ import org.apache.aries.osgi.functional.
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
@@ -110,7 +111,7 @@ public class ComponentTest {
 
         Configuration factoryConfiguration = null;
 
-        try (OSGiResult<?> run = program.run(_bundleContext)) {
+        try (OSGiResult run = program.run(_bundleContext)) {
             factoryConfiguration = _configurationAdmin.createFactoryConfiguration(
                 "org.components.MyComponent");
             factoryConfiguration.update(new Hashtable<>());
@@ -222,7 +223,7 @@ public class ComponentTest {
 
         Configuration factoryConfiguration = null;
 
-        try (OSGiResult<?> run = program.run(_bundleContext)) {
+        try (OSGiResult run = program.run(_bundleContext)) {
             factoryConfiguration =
                 _configurationAdmin.createFactoryConfiguration(
                     "org.components.MyComponent");

Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java Tue Oct 10 15:51:22 2017
@@ -70,7 +70,7 @@ public class DSLTest {
 
         assertEquals(0, atomicInteger.get());
 
-        try (OSGiResult<Integer> result = just.run(
+        try (OSGiResult result = just.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(25, atomicInteger.get());
@@ -80,7 +80,7 @@ public class DSLTest {
 
         OSGi<Integer> map = just(25).map(s -> s + 5);
 
-        try (OSGiResult<Integer> result = map.run(
+        try (OSGiResult result = map.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(30, atomicInteger.get());
@@ -90,7 +90,7 @@ public class DSLTest {
 
         OSGi<Integer> flatMap = just(25).flatMap(s -> just(s + 10));
 
-        try (OSGiResult<Integer> result = flatMap.run(
+        try (OSGiResult result = flatMap.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(35, atomicInteger.get());
@@ -100,7 +100,7 @@ public class DSLTest {
 
         OSGi<Integer> filter = just(25).filter(s -> s % 2 == 0);
 
-        try (OSGiResult<Integer> result = filter.run(
+        try (OSGiResult result = filter.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(0, atomicInteger.get());
@@ -110,7 +110,7 @@ public class DSLTest {
 
         filter = just(25).filter(s -> s % 2 != 0);
 
-        try (OSGiResult<Integer> result = filter.run(
+        try (OSGiResult result = filter.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(25, atomicInteger.get());
@@ -126,7 +126,7 @@ public class DSLTest {
         ServiceRegistration<Service> serviceRegistration = null;
 
         try(
-            OSGiResult<ServiceReference<Service>> osGiResult =
+            OSGiResult osGiResult =
                 serviceReferences(Service.class).
                 run(bundleContext, atomicReference::set)
         ) {
@@ -159,7 +159,7 @@ public class DSLTest {
         ServiceRegistration<Service> serviceRegistration = null;
 
         try(
-            OSGiResult<?> osGiResult = program.run(
+            OSGiResult osGiResult = program.run(
             bundleContext, atomicReference::set)
         ) {
             assertNull(atomicReference.get());
@@ -194,7 +194,7 @@ public class DSLTest {
 
         CountDownLatch countDownLatch = new CountDownLatch(1);
 
-        try(OSGiResult<Dictionary<String, ?>> result =
+        try(OSGiResult result =
             configuration("test.configuration").run(
                 bundleContext,
                 x -> {
@@ -239,7 +239,7 @@ public class DSLTest {
 
         Configuration configuration = null;
 
-        try(OSGiResult<Dictionary<String, ?>> result =
+        try(OSGiResult result =
             configurations("test.configuration").run(
                 bundleContext,
                 x -> {
@@ -275,7 +275,7 @@ public class DSLTest {
 
         Service service = new Service();
 
-        OSGiResult<ServiceRegistration<Service>> result = register(
+        OSGiResult result = register(
             Service.class, service, new HashMap<>()).
             run(bundleContext);
 
@@ -313,7 +313,7 @@ public class DSLTest {
                 }})
             );
 
-        OSGiResult<ServiceRegistration<Service>> result = program.run(
+        OSGiResult result = program.run(
             bundleContext);
 
         assertEquals(
@@ -454,7 +454,7 @@ public class DSLTest {
             program = services(filter).then(program);
         }
 
-        try (OSGiResult<?> result = program.run(bundleContext)) {
+        try (OSGiResult result = program.run(bundleContext)) {
             assertFalse(closed.get());
             assertFalse(executed.get());
 
@@ -509,7 +509,7 @@ public class DSLTest {
 
         assertNull(current.get());
 
-        try (OSGiResult<Void> result = program.run(bundleContext)) {
+        try (OSGiResult result = program.run(bundleContext)) {
             ServiceRegistration<Service> serviceRegistrationOne =
                 bundleContext.registerService(
                     Service.class, new Service(),