You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/11/17 16:00:41 UTC

svn commit: r1815581 - in /aries/trunk/component-dsl: component-dsl/src/main/java/org/apache/aries/osgi/functional/ component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ itests/src/main/java/org/apache/aries/osgi/functional/internal/ i...

Author: csierra
Date: Fri Nov 17 16:00:41 2017
New Revision: 1815581

URL: http://svn.apache.org/viewvc?rev=1815581&view=rev
Log:
[Component-DSL] Refactor and source formatting

Added:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java
      - copied, changed from r1815580, aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java
      - copied, changed from r1815580, aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
Removed:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Event.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/HighestsPerTransformer.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
    aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
Modified:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java
    aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
    aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
    aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
    aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java Fri Nov 17 16:00:41 2017
@@ -31,9 +31,6 @@ import java.util.stream.Collectors;
 public class CachingServiceReference<T>
     implements Comparable<CachingServiceReference<T>> {
 
-    private final ConcurrentHashMap<String, Object> _properties;
-    private final ServiceReference<T> _serviceReference;
-
     public CachingServiceReference(ServiceReference<T> serviceReference) {
         _properties = new ConcurrentHashMap<>();
         _serviceReference = serviceReference;
@@ -140,12 +137,6 @@ public class CachingServiceReference<T>
      */
     public ServiceReference<T> getServiceReference() {
         return _serviceReference;
-    }    @Override
-    public String toString() {
-        return "CachingServiceReference{" +
-            "cachedProperties=" + _properties + ", " +
-            "serviceReference=" + _serviceReference +
-            '}';
     }
 
     @Override
@@ -162,7 +153,6 @@ public class CachingServiceReference<T>
 
         return _serviceReference.equals(that._serviceReference);
     }
-
     /**
      * Checks if any of the cached properties has a different value in the
      * underlying {@link ServiceReference}. Only properties that have been
@@ -180,6 +170,14 @@ public class CachingServiceReference<T>
         );
     }
 
+    @Override
+    public String toString() {
+        return "CachingServiceReference{" +
+            "cachedProperties=" + _properties + ", " +
+            "serviceReference=" + _serviceReference +
+            '}';
+    }
+
     /**
      * Checks if the property is dirty in this instance without caching the
      * value. Trying to do the same using getProperty would cache the property
@@ -194,6 +192,9 @@ public class CachingServiceReference<T>
             !value.equals(_serviceReference.getProperty(key));
     }
 
+    private final ConcurrentHashMap<String, Object> _properties;
+    private final ServiceReference<T> _serviceReference;
+
     private static class NULL {
         private static NULL INSTANCE = new NULL();
 
@@ -208,6 +209,4 @@ public class CachingServiceReference<T>
         }
     }
 
-
-
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java Fri Nov 17 16:00:41 2017
@@ -59,42 +59,12 @@ import java.util.function.Supplier;
 public interface OSGi<T> extends OSGiRunnable<T> {
 	Runnable NOOP = () -> {};
 
-	<S> OSGi<S> choose(
-		Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
-		Function<OSGi<T>, OSGi<S>> otherwise);
-
-	<S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
-
-	<K, S> OSGi<S> splitBy(
-		Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
-
-	OSGi<T> recover(BiFunction<T, Exception, T> onError);
-
-	OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
-
-	OSGi<T> effects(
-		Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
-
-	<S> OSGi<S> map(Function<? super T, ? extends S> function);
-
-	<S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
-
-	<S> OSGi<S> then(OSGi<S> next);
-
-	OSGi<Void> foreach(Consumer<? super T> onAdded);
-
-	OSGi<Void> foreach(
-		Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
-
-	<S> OSGi<S> transform(
-		Function<Function<S, Runnable>, Function<T, Runnable>> fun);
-
-	static OSGi<Void> ignore(OSGi<?> program) {
-		return new IgnoreImpl(program);
+	@SafeVarargs
+	static <T> OSGi<T> all(OSGi<T> ... programs) {
+		return new AllOSGi<>(programs);
 	}
 
 	static OSGi<BundleContext> bundleContext() {
-
 		return new BundleContextOSGiImpl();
 	}
 
@@ -108,6 +78,26 @@ public interface OSGi<T> extends OSGiRun
 		return new ChangeContextOSGiImpl<>(program, bundleContext);
 	}
 
+	static <A, B, C> OSGi<C> combine(Function2<A, B, C> fun, OSGi<A> a, OSGi<B> b) {
+		return b.applyTo(a.applyTo(just(fun.curried())));
+	}
+
+	static <A, B, C, D> OSGi<D> combine(Function3<A, B, C, D> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c) {
+		return c.applyTo(OSGi.combine((A aa, B bb) -> fun.curried().apply(aa).apply(bb), a, b));
+	}
+
+	static <A, B, C, D, E> OSGi<E> combine(Function4<A, B, C, D, E> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d) {
+		return d.applyTo(OSGi.combine((A aa, B bb, C cc) -> fun.curried().apply(aa).apply(bb).apply(cc), a, b, c));
+	}
+
+	static <A, B, C, D, E, F> OSGi<F> combine(Function5<A, B, C, D, E, F> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e) {
+		return e.applyTo(OSGi.combine((A aa, B bb, C cc, D dd) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd), a, b, c, d));
+	}
+
+	static <A, B, C, D, E, F, G> OSGi<G> combine(Function6<A, B, C, D, E, F, G> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e, OSGi<F> f) {
+		return f.applyTo(OSGi.combine((A aa, B bb, C cc, D dd, E ee) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd).apply(ee), a, b, c, d, e));
+	}
+
 	static OSGi<Dictionary<String, ?>> configuration(String pid) {
 		return new ConfigurationOSGiImpl(pid);
 	}
@@ -116,6 +106,14 @@ public interface OSGi<T> extends OSGiRun
 		return new ConfigurationsOSGiImpl(factoryPid);
 	}
 
+	static OSGi<Void> ignore(OSGi<?> program) {
+		return new IgnoreImpl(program);
+	}
+
+	static <S> OSGi<S> join(OSGi<OSGi<S>> program) {
+		return program.flatMap(x -> x);
+	}
+
 	static <S> OSGi<S> just(S s) {
 		return new JustOSGiImpl<>(s);
 	}
@@ -128,10 +126,6 @@ public interface OSGi<T> extends OSGiRun
 		return new JustOSGiImpl<>(() -> Collections.singletonList(s.get()));
 	}
 
-	static <S> OSGi<S> join(OSGi<OSGi<S>> program) {
-		return program.flatMap(x -> x);
-	}
-
 	static <S> OSGi<S> nothing() {
 		return new NothingOSGiImpl<>();
 	}
@@ -140,6 +134,28 @@ public interface OSGi<T> extends OSGiRun
 		return new OnCloseOSGiImpl(action);
 	}
 
+	static <T> OSGi<T> once(OSGi<T> program) {
+		return program.transform(op -> {
+			AtomicInteger count = new AtomicInteger();
+
+			AtomicReference<Runnable> terminator = new AtomicReference<>();
+
+			return t -> {
+				if (count.getAndIncrement() == 0) {
+					terminator.set(op.apply(t));
+				}
+
+				return () -> {
+					if (count.decrementAndGet() == 0) {
+						Runnable runnable = terminator.getAndSet(NOOP);
+
+						runnable.run();
+					}
+				};
+			};
+		});
+	}
+
 	static OSGi<ServiceObjects<Object>> prototypes(String filterString) {
 		return prototypes(null, filterString);
 	}
@@ -180,55 +196,6 @@ public interface OSGi<T> extends OSGiRun
 		return new ServiceRegistrationOSGiImpl(classes, service, properties);
 	}
 
-	static <T> OSGi<T> services(Class<T> clazz) {
-		return services(clazz, null);
-	}
-
-	static <T> OSGi<Object> services(String filterString) {
-		return services(null, filterString);
-	}
-
-	static <T> OSGi<T> services(Class<T> clazz, String filterString) {
-		return
-			bundleContext().flatMap(
-			bundleContext ->
-
-			serviceReferences(clazz, filterString).map(
-				CachingServiceReference::getServiceReference
-			).flatMap(
-				sr -> {
-					T service = bundleContext.getService(sr);
-
-					return
-						onClose(() -> bundleContext.ungetService(sr)).then(
-						just(service)
-					);
-			}
-		));
-	}
-
-	public static <T> OSGi<T> once(OSGi<T> program) {
-		return program.transform(op -> {
-			AtomicInteger count = new AtomicInteger();
-
-			AtomicReference<Runnable> terminator = new AtomicReference<>();
-
-			return t -> {
-				if (count.getAndIncrement() == 0) {
-					terminator.set(op.apply(t));
-				}
-
-				return () -> {
-					if (count.decrementAndGet() == 0) {
-						Runnable runnable = terminator.getAndSet(NOOP);
-
-						runnable.run();
-					}
-				};
-			};
-		});
-	}
-
 	static <T> OSGi<CachingServiceReference<T>> serviceReferences(
 		Class<T> clazz) {
 
@@ -267,35 +234,66 @@ public interface OSGi<T> extends OSGiRun
 		return new ServiceReferenceOSGi<>(filterString, null, onModified);
 	}
 
-	@SafeVarargs
-	static <T> OSGi<T> all(OSGi<T> ... programs) {
-		return new AllOSGi<>(programs);
+	static <T> OSGi<T> services(Class<T> clazz) {
+		return services(clazz, null);
 	}
 
-	OSGi<T> filter(Predicate<T> predicate);
-
-	public default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
-		return fun.flatMap(this::map);
+	static <T> OSGi<Object> services(String filterString) {
+		return services(null, filterString);
 	}
 
-	public static <A, B, C> OSGi<C> combine(Function2<A, B, C> fun, OSGi<A> a, OSGi<B> b) {
-		return b.applyTo(a.applyTo(just(fun.curried())));
-	}
+	static <T> OSGi<T> services(Class<T> clazz, String filterString) {
+		return
+			bundleContext().flatMap(
+			bundleContext ->
 
-	public static <A, B, C, D> OSGi<D> combine(Function3<A, B, C, D> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c) {
-		return c.applyTo(OSGi.combine((A aa, B bb) -> fun.curried().apply(aa).apply(bb), a, b));
-	}
+			serviceReferences(clazz, filterString).map(
+				CachingServiceReference::getServiceReference
+			).flatMap(
+				sr -> {
+					T service = bundleContext.getService(sr);
 
-	public static <A, B, C, D, E> OSGi<E> combine(Function4<A, B, C, D, E> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d) {
-		return d.applyTo(OSGi.combine((A aa, B bb, C cc) -> fun.curried().apply(aa).apply(bb).apply(cc), a, b, c));
+					return
+						onClose(() -> bundleContext.ungetService(sr)).then(
+						just(service)
+					);
+			}
+		));
 	}
 
-	public static <A, B, C, D, E, F> OSGi<F> combine(Function5<A, B, C, D, E, F> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e) {
-		return e.applyTo(OSGi.combine((A aa, B bb, C cc, D dd) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd), a, b, c, d));
+	default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+		return fun.flatMap(this::map);
 	}
 
-	public static <A, B, C, D, E, F, G> OSGi<G> combine(Function6<A, B, C, D, E, F, G> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e, OSGi<F> f) {
-		return f.applyTo(OSGi.combine((A aa, B bb, C cc, D dd, E ee) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd).apply(ee), a, b, c, d, e));
-	}
+	<S> OSGi<S> choose(
+		Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
+		Function<OSGi<T>, OSGi<S>> otherwise);
+
+	<S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
+
+	OSGi<T> effects(
+		Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
+
+	OSGi<T> filter(Predicate<T> predicate);
+
+	<S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
+
+	OSGi<Void> foreach(Consumer<? super T> onAdded);
+
+	OSGi<Void> foreach(
+		Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
+
+	<S> OSGi<S> map(Function<? super T, ? extends S> function);
+
+	OSGi<T> recover(BiFunction<T, Exception, T> onError);
+
+	OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
+
+	<K, S> OSGi<S> splitBy(
+		Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
+
+	<S> OSGi<S> then(OSGi<S> next);
+
+	<S> OSGi<S> transform(Transformer<T, S> fun);
 
 }

Copied: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java (from r1815580, aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java)
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java&p1=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java&r1=1815580&r2=1815581&rev=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java Fri Nov 17 16:00:41 2017
@@ -17,12 +17,17 @@
 
 package org.apache.aries.osgi.functional;
 
+import java.util.function.Function;
+
 /**
  * @author Carlos Sierra Andrés
  */
-public interface SentEvent<T> {
+public interface Publisher<T> extends Function<T, Runnable> {
+
+    default Runnable apply(T t) {
+        return publish(t);
+    }
 
-    Event<T> getEvent();
+    Runnable publish(T t);
 
-    void terminate();
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java Fri Nov 17 16:00:41 2017
@@ -6,5 +6,13 @@ import java.util.function.Function;
  * @author Carlos Sierra Andrés
  */
 public interface Transformer<T, R> extends
-    Function<Function<R, Runnable>, Function<T, Runnable>> {
+    Function<Publisher<R>, Publisher<T>> {
+
+    @Override
+    default Publisher<T> apply(Publisher<R> pipe) {
+        return transform(pipe);
+    }
+
+    Publisher<T> transform(Publisher<R> pipe);
+
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java Fri Nov 17 16:00:41 2017
@@ -1,24 +1,19 @@
 package org.apache.aries.osgi.functional;
 
-import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList;
+import org.apache.aries.osgi.functional.internal.AccumulateTransformer;
 import org.apache.aries.osgi.functional.internal.HighestRankingOSGi;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiPredicate;
 import java.util.function.Function;
 
-import static org.apache.aries.osgi.functional.OSGi.NOOP;
-
 /**
  * @author Carlos Sierra Andrés
  */
 public interface Utils {
 
-    static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
-        return highest(program, Comparator.naturalOrder());
+    static <T> OSGi<List<T>> accumulate(OSGi<T> program) {
+        return program.transform(new AccumulateTransformer<>());
     }
 
     static <T> OSGi<T> highest(
@@ -28,58 +23,14 @@ public interface Utils {
     }
 
     static <T> OSGi<T> highest(
-        OSGi<T> program, Comparator<? super T> comparator, Function<OSGi<T>, OSGi<T>> notHighest) {
+        OSGi<T> program, Comparator<? super T> comparator,
+        Function<OSGi<T>, OSGi<T>> notHighest) {
 
         return new HighestRankingOSGi<>(program, comparator, notHighest);
     }
 
-    static <T> OSGi<List<T>> accumulate(OSGi<T> program) {
-        return program.transform(op -> {
-            ConcurrentDoublyLinkedList<T> list =
-                new ConcurrentDoublyLinkedList<>();
-
-            AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
-
-            return t -> {
-                ConcurrentDoublyLinkedList.Node node = list.addLast(t);
-
-                publish(op, list, terminator);
-
-                return () -> {
-                    node.remove();
-
-                    publish(op, list, terminator);
-                };
-            };
-        });
-    }
-
-    static <T> void publish(Function<List<T>, Runnable> op, ConcurrentDoublyLinkedList<T> list, AtomicReference<Runnable> terminator) {
-        Runnable runnable = terminator.get();
-
-        runnable.run();
-
-        terminator.set(op.apply(new ArrayList<>(list)));
-    }
-
-    static <T> OSGi<T> republishIf(
-        BiPredicate<T, T> refresher, OSGi<T> program) {
-
-        return program.transform(op -> {
-            AtomicReference<T> old = new AtomicReference<>();
-            AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
-
-            return t -> {
-                if (refresher.test(old.get(), t)) {
-                    terminator.get().run();
-
-                    old.set(t);
-                    terminator.set(op.apply(t));
-                }
-
-                return () -> {};
-            };
-        });
+    static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
+        return highest(program, Comparator.naturalOrder());
     }
 
 }

Added: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java?rev=1815581&view=auto
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java (added)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java Fri Nov 17 16:00:41 2017
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.Publisher;
+import org.apache.aries.osgi.functional.Transformer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static org.apache.aries.osgi.functional.OSGi.NOOP;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class AccumulateTransformer<T> implements Transformer<T, List<T>> {
+
+    @Override
+    public Publisher<T> transform(Publisher<List<T>> op) {
+        ConcurrentDoublyLinkedList<T> list =
+            new ConcurrentDoublyLinkedList<>();
+
+        AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
+
+        return t -> {
+            ConcurrentDoublyLinkedList.Node node = list.addLast(t);
+
+            publish(op, list, terminator);
+
+            return () -> {
+                node.remove();
+
+                publish(op, list, terminator);
+            };
+        };
+    }
+
+    private static <T> void publish(
+        Function<List<T>, Runnable> op, ConcurrentDoublyLinkedList<T> list,
+        AtomicReference<Runnable> terminator) {
+
+        Runnable runnable = terminator.get();
+
+        runnable.run();
+
+        terminator.set(op.apply(new ArrayList<>(list)));
+    }
+
+}

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java Fri Nov 17 16:00:41 2017
@@ -17,23 +17,16 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleEvent;
 import org.osgi.util.tracker.BundleTracker;
 import org.osgi.util.tracker.BundleTrackerCustomizer;
 
-import java.util.function.Consumer;
-import java.util.function.Function;
-
 /**
  * @author Carlos Sierra Andrés
  */
 public class BundleOSGi extends OSGiImpl<Bundle> {
 
-	private final int _stateMask;
-
 	public BundleOSGi(int stateMask) {
 		super((bundleContext, op) -> {
 			BundleTracker<Runnable> bundleTracker =
@@ -68,54 +61,6 @@ public class BundleOSGi extends OSGiImpl
 				bundleTracker::open, bundleTracker::close);
 		});
 
-		_stateMask = stateMask;
-	}
-
-	@Override
-	public <S> OSGiImpl<S> flatMap(
-		Function<? super Bundle, OSGi<? extends S>> fun) {
-
-		return new OSGiImpl<>((bundleContext, op) -> {
-			BundleTracker<OSGiResult> bundleTracker =
-				new BundleTracker<>(
-					bundleContext, _stateMask,
-					new BundleTrackerCustomizer<OSGiResult>() {
-
-						@Override
-						public OSGiResult addingBundle(
-							Bundle bundle, BundleEvent bundleEvent) {
-
-							OSGiImpl<S> program = (OSGiImpl<S>) fun.apply(
-								bundle);
-
-							OSGiResultImpl result = program._operation.run(
-								bundleContext, op);
-
-							result.start();
-
-							return result;
-						}
-
-						@Override
-						public void modifiedBundle(
-							Bundle bundle, BundleEvent bundleEvent,
-							OSGiResult result) {
-
-						}
-
-						@Override
-						public void removedBundle(
-							Bundle bundle, BundleEvent bundleEvent,
-							OSGiResult result) {
-
-							result.close();
-						}
-					});
-
-			return new OSGiResultImpl(
-				bundleTracker::open, bundleTracker::close);
-
-		});
 	}
 
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java Fri Nov 17 16:00:41 2017
@@ -28,8 +28,7 @@ import java.util.concurrent.atomic.Atomi
 /**
  * @author Carlos Sierra Andrés
  */
-public class ConfigurationOSGiImpl
-	extends OSGiImpl<Dictionary<String, ?>> {
+public class ConfigurationOSGiImpl extends OSGiImpl<Dictionary<String, ?>> {
 
 	public ConfigurationOSGiImpl(String pid) {
 		super((bundleContext, op) -> {

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java Fri Nov 17 16:00:41 2017
@@ -40,10 +40,10 @@ public class HighestRankingOSGi<T> exten
                 comparing.reversed());
             AtomicReference<Tuple<T>> sent = new AtomicReference<>();
 
-            Function<T, Runnable> notHighestPipe = ProbeImpl.getProbePipe(
-                notHighest, bundleContext, __ -> () -> {});
+            Pad<T, T> notHighestPad = new Pad<>(
+                bundleContext, notHighest, __ -> NOOP);
 
-            return ((OSGiImpl<T>)previous)._operation.run(
+            OSGiResultImpl result = ((OSGiImpl<T>) previous)._operation.run(
                 bundleContext,
                 t -> {
                     Tuple<T> tuple = new Tuple<>(t);
@@ -57,15 +57,14 @@ public class HighestRankingOSGi<T> exten
                             if (old != null) {
                                 old._runnable.run();
 
-                                old._runnable = notHighestPipe.apply(old._t);
+                                old._runnable = notHighestPad.publish(old._t);
                             }
 
                             tuple._runnable = highestPipe.apply(t);
 
                             sent.set(tuple);
-                        }
-                        else {
-                            tuple._runnable = notHighestPipe.apply(t);
+                        } else {
+                            tuple._runnable = notHighestPad.publish(t);
                         }
                     }
 
@@ -91,14 +90,19 @@ public class HighestRankingOSGi<T> exten
                         }
                     };
                 });
+
+            return new OSGiResultImpl(
+                result::start,
+                () -> {
+                    result.close();
+
+                    notHighestPad.close();
+                });
         });
     }
 
     private static class Tuple<T> {
 
-        T _t;
-        Runnable _runnable;
-
         Tuple(T t) {
             _t = t;
         }
@@ -106,6 +110,8 @@ public class HighestRankingOSGi<T> exten
         public T getT() {
             return _t;
         }
+        T _t;
+        Runnable _runnable;
 
     }
 

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java Fri Nov 17 16:00:41 2017
@@ -27,8 +27,7 @@ public class IgnoreImpl extends OSGiImpl
     public IgnoreImpl(OSGi<?> program) {
 
         super((bundleContext, op) ->
-            ((OSGiImpl<?>) program)._operation.run(
-                bundleContext, t -> () -> {}));
+            ((OSGiImpl<?>) program)._operation.run(bundleContext, t -> NOOP));
     }
 
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Fri Nov 17 16:00:41 2017
@@ -19,7 +19,8 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.Utils;
+import org.apache.aries.osgi.functional.Publisher;
+import org.apache.aries.osgi.functional.Transformer;
 import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList.Node;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Filter;
@@ -47,27 +48,76 @@ public class OSGiImpl<T> implements OSGi
 	}
 
 	@Override
-	public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
-		return new FlatMapImpl<>(this, fun);
-	}
+	public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+		return new OSGiImpl<>(
+			(bundleContext, op) -> {
+				AtomicReference<OSGiResult> myCloseReference =
+					new AtomicReference<>();
 
-	@Override
-	public OSGi<Void> foreach(Consumer<? super T> onAdded) {
-		return foreach(onAdded, ign -> {});
-	}
+				AtomicReference<OSGiResult> otherCloseReference =
+					new AtomicReference<>();
 
-	@Override
-	public OSGi<Void> foreach(
-		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+				ConcurrentDoublyLinkedList<T> identities =
+					new ConcurrentDoublyLinkedList<>();
 
-		return OSGi.ignore(effects(onAdded, onRemoved));
-	}
+				ConcurrentDoublyLinkedList<Function<T, S>> funs =
+					new ConcurrentDoublyLinkedList<>();
 
-	@Override
-	public <S> OSGi<S> transform(
-		Function<Function<S, Runnable>, Function<T, Runnable>> fun) {
+				return new OSGiResultImpl(
+					() -> {
+						OSGiResultImpl or1 = _operation.run(
+							bundleContext,
+							t -> {
+								Node node = identities.addLast(t);
 
-		return new TransformerOSGi<>(this, fun);
+								List<Runnable> terminators = funs.stream().map(
+									f -> op.apply(f.apply(t))
+								).collect(
+									Collectors.toList()
+								);
+
+								return () -> {
+									node.remove();
+
+									terminators.forEach(Runnable::run);
+								};
+							}
+						);
+
+						myCloseReference.set(or1);
+
+						OSGiResultImpl funRun =
+							((OSGiImpl<Function<T, S>>) fun)._operation.run(
+								bundleContext,
+								f -> {
+									Node node = funs.addLast(f);
+
+									List<Runnable> terminators =
+										identities.stream().map(
+											t -> op.apply(f.apply(t))
+										).collect(
+											Collectors.toList()
+										);
+
+									return () -> {
+										node.remove();
+
+										terminators.forEach(Runnable::run);
+									};
+								});
+
+						otherCloseReference.set(funRun);
+
+						or1.start();
+
+						funRun.start();
+					},
+					() -> {
+						myCloseReference.get().close();
+
+						otherCloseReference.get().close();
+					});
+			});
 	}
 
 	@Override
@@ -76,20 +126,25 @@ public class OSGiImpl<T> implements OSGi
 		Function<OSGi<T>, OSGi<S>> otherwise) {
 
 		return new OSGiImpl<>((bundleContext, publisher) -> {
-			Function<T, Runnable> thenPipe = ProbeImpl.getProbePipe(then, bundleContext, publisher);
-
-			Function<T, Runnable> elsePipe = ProbeImpl.getProbePipe(otherwise, bundleContext, publisher);
+			Pad<T, S> thenPad = new Pad<>(bundleContext, then, publisher);
+			Pad<T, S> elsePad = new Pad<>(bundleContext, otherwise, publisher);
 
-			return _operation.run(
+			OSGiResultImpl result = _operation.run(
 				bundleContext,
 				t -> {
 					if (chooser.test(t)) {
-						return thenPipe.apply(t);
-					}
-					else {
-						return elsePipe.apply(t);
+						return thenPad.publish(t);
+					} else {
+						return elsePad.publish(t);
 					}
 				});
+			return new OSGiResultImpl(
+				result::start,
+				() -> {
+					thenPad.close();
+					elsePad.close();
+					result.close();
+				});
 		});
 	}
 
@@ -97,60 +152,102 @@ public class OSGiImpl<T> implements OSGi
 	@SafeVarargs
 	public final <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs) {
 		return new OSGiImpl<>((bundleContext, publisher) -> {
-			List<Function<T, Runnable>> pipes =
+			List<Pad<T, S>> pads =
 				Arrays.stream(
 					funs
 				).map(
-					fun -> ProbeImpl.getProbePipe(fun, bundleContext, publisher)
+					fun -> new Pad<>(bundleContext, fun, publisher)
 				).collect(
 					Collectors.toList()
 			);
 
-			return _operation.run(
+			OSGiResultImpl result = _operation.run(
 				bundleContext,
 				t -> {
 					List<Runnable> terminators =
-						pipes.stream().map(p -> p.apply(t)).collect(
+						pads.stream().map(p -> p.publish(t)).collect(
 							Collectors.toList());
 
 					return () -> {
 						terminators.forEach(Runnable::run);
 					};
 				});
+
+			return new OSGiResultImpl(
+				result::start,
+				() -> {
+					result.close();
+
+					pads.forEach(Pad::close);
+				});
 		});
 	}
 
 	@Override
-	public <K, S> OSGi<S> splitBy(
-		Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
-
-		return new OSGiImpl<>((bundleContext, op) -> {
-			HashMap<K, Function<T, Runnable>> pipes = new HashMap<>();
-			HashMap<K, OSGiResult> results = new HashMap<>();
+	public OSGi<T> effects(
+		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
-			return _operation.run(
+		return new OSGiImpl<>((bundleContext, op) ->
+			_operation.run(
 				bundleContext,
 				t -> {
-					K key = mapper.apply(t);
+					onAdded.accept(t);
 
-					results.computeIfAbsent(key, __ -> {
-						ProbeImpl<T> probe = new ProbeImpl<>();
+					Runnable terminator;
+					try {
+						terminator = op.apply(t);
+					}
+					catch (Exception e) {
+						onRemoved.accept(t);
 
-						OSGiImpl<S> program = (OSGiImpl<S>)fun.apply(probe);
+						throw e;
+					}
 
-						OSGiResult r = program._operation.run(
-							bundleContext, op);
+					return () -> {
+						onRemoved.accept(t);
 
-						r.start();
+						terminator.run();
+					};
+				}));
+	}
 
-						pipes.put(key, probe.getOperation());
+	@Override
+	public OSGi<T> filter(Predicate<T> predicate) {
+		return new OSGiImpl<>((bundleContext, op) ->
+			_operation.run(
+				bundleContext,
+				(t) -> {
+					if (predicate.test(t)) {
+						return op.apply(t);
+					}
+					else {
+						return () -> {};
+					}
+				}
+			));
+	}
 
-						return r;
-					});
+	@Override
+	public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
+		return new FlatMapImpl<>(this, fun);
+	}
 
-					return pipes.get(key).apply(t);
-				});
-		});
+	@Override
+	public OSGi<Void> foreach(Consumer<? super T> onAdded) {
+		return foreach(onAdded, ign -> {});
+	}
+
+	@Override
+	public OSGi<Void> foreach(
+		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+
+		return OSGi.ignore(effects(onAdded, onRemoved));
+	}
+
+	@Override
+	public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
+		return new OSGiImpl<>((bundleContext, op) ->
+			_operation.run(bundleContext, t -> op.apply(function.apply(t))));
 	}
 
 	@Override
@@ -182,10 +279,7 @@ public class OSGiImpl<T> implements OSGi
 						OSGi<T> errorProgram = onError.apply(t, e);
 
 						OSGiResult result =
-							((OSGiImpl<T>) errorProgram)._operation.run(
-								bundleContext, op);
-
-						result.start();
+							((OSGiImpl<T>) errorProgram).run(bundleContext, op);
 
 						return result::close;
 					}
@@ -194,37 +288,40 @@ public class OSGiImpl<T> implements OSGi
 	}
 
 	@Override
-	public OSGi<T> effects(
-		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+	public <K, S> OSGi<S> splitBy(
+		Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
 
-		return new OSGiImpl<>((bundleContext, op) ->
-			_operation.run(
-				bundleContext,
-				t -> {
-					onAdded.accept(t);
+		return new OSGiImpl<>((bundleContext, op) -> {
+			HashMap<K, Pad<T, S>> pads = new HashMap<>();
 
-					Runnable terminator;
-					try {
-						terminator = op.apply(t);
-					}
-					catch (Exception e) {
-						onRemoved.accept(t);
+			OSGiResultImpl result = _operation.run(
+				bundleContext,
+				t ->
+					pads.computeIfAbsent(
+						mapper.apply(t),
+						__ -> new Pad<>(bundleContext, fun, op)
+					).apply(t)
+			);
 
-						throw e;
-					}
+			return new OSGiResultImpl(
+				result::start,
+				() -> {
+					pads.values().forEach(Pad::close);
 
-					return () -> {
-						onRemoved.accept(t);
+					result.close();
+				});
+		});
+	}
 
-						terminator.run();
-					};
-				}));
+	@Override
+	public <S> OSGi<S> then(OSGi<S> next) {
+		return flatMap(ignored -> next);
 	}
 
 	@Override
-	public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
-		return new OSGiImpl<>((bundleContext, op) ->
-			_operation.run(bundleContext, t -> op.apply(function.apply(t))));
+	public <S> OSGi<S> transform(Transformer<T, S> fun) {
+
+		return new TransformerOSGi<>(this, fun);
 	}
 
 	@Override
@@ -234,23 +331,15 @@ public class OSGiImpl<T> implements OSGi
 
 	@Override
 	public OSGiResult run(BundleContext bundleContext, Consumer<T> andThen) {
-		OSGiResultImpl osgiResult =
-			_operation.run(
-				bundleContext,
-				t -> {
-					andThen.accept(t);
+		return run(bundleContext, t -> {andThen.accept(t); return NOOP;});
+	}
 
-					return () -> {};
-				});
+	public OSGiResult run(BundleContext bundleContext, Publisher<T> op) {
+		OSGiResultImpl result = _operation.run(bundleContext, op);
 
-		osgiResult.start();
+		result.start();
 
-		return osgiResult;
-	}
-
-	@Override
-	public <S> OSGi<S> then(OSGi<S> next) {
-		return flatMap(ignored -> next);
+		return result;
 	}
 
 	static Filter buildFilter(
@@ -302,95 +391,6 @@ public class OSGiImpl<T> implements OSGi
 		return stringBuilder.toString();
 	}
 
-	@Override
-	public OSGi<T> filter(Predicate<T> predicate) {
-		return new OSGiImpl<>((bundleContext, op) ->
-			_operation.run(
-				bundleContext,
-				(t) -> {
-					if (predicate.test(t)) {
-						return op.apply(t);
-					}
-					else {
-						return () -> {};
-					}
-				}
-			));
-	}
-
-	@Override
-	public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
-		return new OSGiImpl<>(
-			(bundleContext, op) -> {
-				AtomicReference<OSGiResult> myCloseReference =
-					new AtomicReference<>();
-
-				AtomicReference<OSGiResult> otherCloseReference =
-					new AtomicReference<>();
-
-				ConcurrentDoublyLinkedList<T> identities =
-					new ConcurrentDoublyLinkedList<>();
-
-				ConcurrentDoublyLinkedList<Function<T, S>> funs =
-					new ConcurrentDoublyLinkedList<>();
-
-				return new OSGiResultImpl(
-					() -> {
-						OSGiResultImpl or1 = _operation.run(
-							bundleContext,
-							t -> {
-								Node node = identities.addLast(t);
-
-								List<Runnable> terminators = funs.stream().map(
-									f -> op.apply(f.apply(t))
-								).collect(
-									Collectors.toList()
-								);
-
-								return () -> {
-									node.remove();
-
-									terminators.forEach(Runnable::run);
-								};
-							}
-						);
-
-						myCloseReference.set(or1);
-
-						OSGiResultImpl funRun =
-							((OSGiImpl<Function<T, S>>) fun)._operation.run(
-								bundleContext,
-								f -> {
-									Node node = funs.addLast(f);
-
-									List<Runnable> terminators =
-										identities.stream().map(
-											t -> op.apply(f.apply(t))
-										).collect(
-											Collectors.toList()
-										);
-
-									return () -> {
-										node.remove();
-
-										terminators.forEach(Runnable::run);
-									};
-								});
-
-						otherCloseReference.set(funRun);
-
-						or1.start();
-
-						funRun.start();
-					},
-					() -> {
-						myCloseReference.get().close();
-
-						otherCloseReference.get().close();
-					});
-			});
-	}
-
 }
 
 

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java Fri Nov 17 16:00:41 2017
@@ -19,6 +19,7 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGiOperation;
 import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
 import org.osgi.framework.BundleContext;
 
 import java.util.function.Function;
@@ -28,8 +29,7 @@ import java.util.function.Function;
  */
 interface OSGiOperationImpl<T> extends OSGiOperation<T> {
 
-	OSGiResultImpl run(
-		BundleContext bundleContext, Function<T, Runnable> op);
+	OSGiResultImpl run(BundleContext bundleContext, Publisher<T> op);
 
 	@Override
 	default OSGiResult run(BundleContext bundleContext) {

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Fri Nov 17 16:00:41 2017
@@ -26,19 +26,25 @@ import java.util.concurrent.atomic.Atomi
  */
 public class OSGiResultImpl implements OSGiResult {
 
-	private final Runnable start;
-	private final Runnable close;
-	private AtomicBoolean _working = new AtomicBoolean();
-	private AtomicBoolean _closed = new AtomicBoolean();
-	private volatile boolean _started = false;
-
-
 	public OSGiResultImpl(Runnable start, Runnable close) {
 		this.start = start;
 		this.close = close;
 	}
 
 	@Override
+	public void close() {
+		while (!_working.compareAndSet(false, true)) {
+			Thread.yield();
+		}
+
+		if (_closed.compareAndSet(false, true) && _started) {
+			close.run();
+		}
+
+		_working.set(false);
+	}
+
+	@Override
 	public void start() {
 		if (_working.compareAndSet(false, true)) {
 
@@ -53,17 +59,10 @@ public class OSGiResultImpl implements O
 
 	}
 
-	@Override
-	public void close() {
-		while (!_working.compareAndSet(false, true)) {
-			Thread.yield();
-		}
-
-		if (_closed.compareAndSet(false, true) && _started) {
-			close.run();
-		}
-
-		_working.set(false);
-	}
+	private final Runnable start;
+	private final Runnable close;
+	private AtomicBoolean _working = new AtomicBoolean();
+	private AtomicBoolean _closed = new AtomicBoolean();
+	private volatile boolean _started = false;
 
 }

Copied: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java (from r1815580, aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java)
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java&p1=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java&r1=1815580&r2=1815581&rev=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java Fri Nov 17 16:00:41 2017
@@ -17,23 +17,48 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.OSGiOperation;
+import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
 import org.osgi.framework.BundleContext;
 
+import java.io.Closeable;
 import java.util.function.Function;
 
+import static org.apache.aries.osgi.functional.OSGi.NOOP;
+
 /**
  * @author Carlos Sierra Andrés
  */
-interface OSGiOperationImpl<T> extends OSGiOperation<T> {
+public class Pad<T, S> implements Publisher<T>, Closeable {
+
+    public Pad(
+        BundleContext bundleContext,
+        Function<OSGi<T>, OSGi<S>> fun,
+        Publisher<S> continuation) {
+
+        ProbeImpl<T> probe = new ProbeImpl<>();
+
+        OSGiImpl<S> next = (OSGiImpl<S>) fun.apply(probe);
+
+        _result = next.run(bundleContext, continuation);
+
+        _publisher =
+            probe.getPublisher() != null ?
+                probe.getPublisher() :
+                __ -> NOOP;
+    }
 
-	OSGiResultImpl run(
-		BundleContext bundleContext, Function<T, Runnable> op);
+    @Override
+    public void close() {
+        _result.close();
+    }
 
-	@Override
-	default OSGiResult run(BundleContext bundleContext) {
-		return run(bundleContext, (__) -> () -> {});
-	}
+    @Override
+    public Runnable publish(T t) {
+        return _publisher.publish(t);
+    }
 
+    private final OSGiResult _result;
+    private final Publisher<T> _publisher;
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java Fri Nov 17 16:00:41 2017
@@ -17,12 +17,9 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
 import org.osgi.framework.BundleContext;
 
-import java.util.function.Function;
-
 /**
  * @author Carlos Sierra Andrés
  */
@@ -32,41 +29,22 @@ public class ProbeImpl<T> extends OSGiIm
         super(new ProbeOperationImpl<>());
     }
 
-    public Function<T, Runnable> getOperation() {
+    public Publisher<T> getPublisher() {
         return ((ProbeOperationImpl<T>) _operation)._op;
     }
 
-    public static <T, S> Function<T, Runnable> getProbePipe(
-        Function<OSGi<T>, OSGi<S>> then, BundleContext bundleContext,
-        Function<S, Runnable> publisher) {
-
-        ProbeImpl<T> thenProbe = new ProbeImpl<>();
-
-        OSGiImpl<S> thenNext = (OSGiImpl<S>) then.apply(thenProbe);
-
-        OSGiResult thenResult = thenNext._operation.run(
-            bundleContext, publisher);
-
-        Function<T, Runnable> thenPipe = thenProbe.getOperation();
-
-        thenResult.start();
-
-        return thenPipe;
-    }
-
     private static class ProbeOperationImpl<T> implements OSGiOperationImpl<T> {
 
-        BundleContext _bundleContext;
-        Function<T, Runnable> _op;
-
         @Override
         public OSGiResultImpl run(
-            BundleContext bundleContext, Function<T, Runnable> op) {
+            BundleContext bundleContext, Publisher<T> op) {
             _bundleContext = bundleContext;
             _op = op;
 
             return new OSGiResultImpl(NOOP, NOOP);
         }
+        BundleContext _bundleContext;
+        Publisher<T> _op;
     }
 
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java Fri Nov 17 16:00:41 2017
@@ -56,10 +56,6 @@ public class ServiceReferenceOSGi<T>
 	private static class DefaultServiceTrackerCustomizer<T>
 		implements ServiceTrackerCustomizer<T, Tracked<T>> {
 
-		private final Function<CachingServiceReference<T>, Runnable>
-			_addedSource;
-		private Refresher<? super CachingServiceReference<T>> _refresher;
-
 		public DefaultServiceTrackerCustomizer(
 			Function<CachingServiceReference<T>, Runnable> addedSource,
 			Refresher<? super CachingServiceReference<T>> refresher) {
@@ -97,13 +93,15 @@ public class ServiceReferenceOSGi<T>
 
 			tracked.runnable.run();
 		}
+
+		private final Function<CachingServiceReference<T>, Runnable>
+			_addedSource;
+		private Refresher<? super CachingServiceReference<T>> _refresher;
+
 	}
 
 	private static class Tracked<T> {
 
-		volatile CachingServiceReference<T> cachingServiceReference;
-		volatile Runnable runnable;
-
 		public Tracked(
 			CachingServiceReference<T> cachingServiceReference,
 			Runnable runnable) {
@@ -111,5 +109,9 @@ public class ServiceReferenceOSGi<T>
 			this.cachingServiceReference = cachingServiceReference;
 			this.runnable = runnable;
 		}
+
+		volatile CachingServiceReference<T> cachingServiceReference;
+		volatile Runnable runnable;
+
 	}
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java Fri Nov 17 16:00:41 2017
@@ -17,19 +17,17 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import java.util.function.Function;
+import org.apache.aries.osgi.functional.Transformer;
 
 /**
  * @author Carlos Sierra Andrés
  */
 public class TransformerOSGi<T, R> extends OSGiImpl<R> {
 
-    public TransformerOSGi(
-        OSGiImpl<T> previous,
-        Function<Function<R, Runnable>, Function<T, Runnable>> fun) {
+    public TransformerOSGi(OSGiImpl<T> previous, Transformer<T, R> fun) {
 
         super((bundleContext, op) ->
-            previous._operation.run(bundleContext, fun.apply(op)));
+            previous._operation.run(bundleContext, fun.transform(op)));
     }
 
 }

Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java Fri Nov 17 16:00:41 2017
@@ -18,14 +18,12 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.test.DSLTest;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -70,11 +68,11 @@ public class ProbeTests {
 
         program.run(bundleContext, result::set);
 
-        Function<String, Runnable> opA = probeA.getOperation();
+        Function<String, Runnable> opA = probeA.getPublisher();
 
         Runnable sentA = opA.apply("Hello");
 
-        Function<String, Runnable> opB = probeBreference.get().getOperation();
+        Function<String, Runnable> opB = probeBreference.get().getPublisher();
 
         sentA.run();
 
@@ -85,10 +83,10 @@ public class ProbeTests {
 
         program.run(bundleContext, result::set);
 
-        opA = probeA.getOperation();
+        opA = probeA.getPublisher();
         sentA = opA.apply("Hello");
 
-        opB = probeBreference.get().getOperation();
+        opB = probeBreference.get().getPublisher();
         sentB = opB.apply(", World");
 
         assertEquals("Hello, World", result.get());
@@ -117,7 +115,7 @@ public class ProbeTests {
         program.run(bundleContext, result::set);
         assertEquals(0, result.get());
 
-        Function<Integer, Runnable> opA = probeA.getOperation();
+        Function<Integer, Runnable> opA = probeA.getPublisher();
 
         Runnable sentA = opA.apply(5);
         assertEquals(15, result.get());

Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java Fri Nov 17 16:00:41 2017
@@ -19,7 +19,6 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.internal.ProbeImpl;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -337,9 +336,9 @@ public class AsynchronousTest {
 
         result.start();
 
-        Function<Integer, Runnable> opa = ((ProbeImpl<Integer>) as).getOperation();
-        Function<Integer, Runnable> opb = ((ProbeImpl<Integer>) bs).getOperation();
-        Function<Integer, Runnable> opc = ((ProbeImpl<Integer>) cs).getOperation();
+        Function<Integer, Runnable> opa = ((ProbeImpl<Integer>) as).getPublisher();
+        Function<Integer, Runnable> opb = ((ProbeImpl<Integer>) bs).getPublisher();
+        Function<Integer, Runnable> opc = ((ProbeImpl<Integer>) cs).getPublisher();
 
         ExecutorService executor = Executors.newFixedThreadPool(8);
 

Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java Fri Nov 17 16:00:41 2017
@@ -23,6 +23,7 @@ import org.apache.aries.osgi.functional.
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;

Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java Fri Nov 17 16:00:41 2017
@@ -712,7 +712,7 @@ public class DSLTest {
 
         once.run(bundleContext);
 
-        Function<Integer, Runnable> op = probe.getOperation();
+        Function<Integer, Runnable> op = probe.getPublisher();
 
         assertEquals(0, count.get());