You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/11/28 16:42:39 UTC

svn commit: r1771765 - in /aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional: ./ internal/

Author: csierra
Date: Mon Nov 28 16:42:38 2016
New Revision: 1771765

URL: http://svn.apache.org/viewvc?rev=1771765&view=rev
Log:
Classes refactoring and proper type bounds

Added:
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
Removed:
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/MOSGi.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleMOSGi.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/MOSGiImpl.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesMOSGi.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesMOSGi.java
Modified:
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
    aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java

Modified: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java (original)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java Mon Nov 28 16:42:38 2016
@@ -19,17 +19,17 @@
 package org.apache.aries.osgi.functional;
 
 import org.apache.aries.osgi.functional.internal.BundleContextOSGiImpl;
-import org.apache.aries.osgi.functional.internal.BundleMOSGi;
+import org.apache.aries.osgi.functional.internal.BundleOSGi;
 import org.apache.aries.osgi.functional.internal.ChangeContextOSGiImpl;
 import org.apache.aries.osgi.functional.internal.ConfigurationOSGiImpl;
 import org.apache.aries.osgi.functional.internal.ConfigurationsOSGiImpl;
 import org.apache.aries.osgi.functional.internal.JustOSGiImpl;
 import org.apache.aries.osgi.functional.internal.NothingOSGiImpl;
 import org.apache.aries.osgi.functional.internal.OnCloseOSGiImpl;
-import org.apache.aries.osgi.functional.internal.PrototypesMOSGi;
+import org.apache.aries.osgi.functional.internal.PrototypesOSGi;
 import org.apache.aries.osgi.functional.internal.ServiceReferenceOSGi;
 import org.apache.aries.osgi.functional.internal.ServiceRegistrationOSGiImpl;
-import org.apache.aries.osgi.functional.internal.ServicesMOSGi;
+import org.apache.aries.osgi.functional.internal.ServicesOSGi;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceObjects;
@@ -38,7 +38,9 @@ import org.osgi.framework.ServiceRegistr
 
 import java.util.Dictionary;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
 
 /**
  * @author Carlos Sierra Andrés
@@ -46,21 +48,21 @@ import java.util.function.Function;
 public interface OSGi<T> extends OSGiRunnable<T> {
 	Runnable NOOP = () -> {};
 
-	<S> OSGi<S> map(Function<T, S> function);
+	<S> OSGi<S> map(Function<? super T, ? extends S> function);
 
-	<S> OSGi<S> flatMap(Function<T, OSGi<S>> fun);
+	<S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
 
 	<S> OSGi<S> then(OSGi<S> next);
 
-	<S> OSGi<Void> foreach(Function<T, OSGi<S>> fun);
+	OSGi<Void> foreach(Consumer<? super T> action);
 
 	static OSGi<BundleContext> bundleContext() {
 
 		return new BundleContextOSGiImpl();
 	}
 
-	static MOSGi<Bundle> bundles(int stateMask) {
-		return new BundleMOSGi(stateMask);
+	static OSGi<Bundle> bundles(int stateMask) {
+		return new BundleOSGi(stateMask);
 	}
 
 	static <T> OSGi<T> changeContext(
@@ -77,6 +79,10 @@ public interface OSGi<T> extends OSGiRun
 		return new ConfigurationsOSGiImpl(factoryPid);
 	}
 
+	static <S> OSGi<S> just(S s) {
+		return new JustOSGiImpl<>(s);
+	}
+
 	static <S> OSGi<S> nothing() {
 		return new NothingOSGiImpl<>();
 	}
@@ -85,22 +91,18 @@ public interface OSGi<T> extends OSGiRun
 		return new OnCloseOSGiImpl(action);
 	}
 
-	static <S> OSGi<S> just(S s) {
-		return new JustOSGiImpl<>(s);
-	}
-
-	static MOSGi<ServiceObjects<Object>> prototypes(String filterString) {
+	static OSGi<ServiceObjects<Object>> prototypes(String filterString) {
 		return prototypes(null, filterString);
 	}
 
-	static <T> MOSGi<ServiceObjects<T>> prototypes(Class<T> clazz) {
+	static <T> OSGi<ServiceObjects<T>> prototypes(Class<T> clazz) {
 		return prototypes(clazz, null);
 	}
 
-	static <T> MOSGi<ServiceObjects<T>> prototypes(
+	static <T> OSGi<ServiceObjects<T>> prototypes(
 		Class<T> clazz, String filterString) {
 
-		return new PrototypesMOSGi<>(clazz, filterString);
+		return new PrototypesOSGi<>(clazz, filterString);
 	}
 
 	static <T, S extends T> OSGi<ServiceRegistration<T>> register(
@@ -110,16 +112,16 @@ public interface OSGi<T> extends OSGiRun
 			clazz, service, properties);
 	}
 
-	static <T> MOSGi<T> services(Class<T> clazz) {
+	static <T> OSGi<T> services(Class<T> clazz) {
 		return services(clazz, null);
 	}
 
-	static <T> MOSGi<Object> services(String filterString) {
+	static <T> OSGi<Object> services(String filterString) {
 		return services(null, filterString);
 	}
 
-	static <T> MOSGi<T> services(Class<T> clazz, String filterString) {
-		return new ServicesMOSGi<>(clazz, filterString);
+	static <T> OSGi<T> services(Class<T> clazz, String filterString) {
+		return new ServicesOSGi<>(clazz, filterString);
 	}
 
 	static <T> OSGi<ServiceReference<T>> serviceReferences(
@@ -140,4 +142,7 @@ public interface OSGi<T> extends OSGiRun
 		return new ServiceReferenceOSGi<>(filterString, clazz);
 	}
 
+	OSGi<T> filter(Predicate<T> predicate);
+
+	OSGi<Void> distribute(Function<T, OSGi<?>>... funs);
 }

Modified: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java (original)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java Mon Nov 28 16:42:38 2016
@@ -19,24 +19,13 @@ package org.apache.aries.osgi.functional
 
 import org.osgi.framework.BundleContext;
 
-import java.util.function.Consumer;
-
 /**
  * @author Carlos Sierra Andrés
  */
 public class BundleContextOSGiImpl extends OSGiImpl<BundleContext> {
 
 	public BundleContextOSGiImpl() {
-		super(bundleContext -> {
-			Pipe<Tuple<BundleContext>, Tuple<BundleContext>> added =
-				Pipe.create();
-
-			Consumer<Tuple<BundleContext>> addedSource = added.getSource();
-
-			return new OSGiResultImpl<>(
-				added, Pipe.create(),
-				() -> addedSource.accept(Tuple.create(bundleContext)),
-				NOOP);
-		});
+		super(bundleContext ->
+			new JustOSGiImpl<>(bundleContext)._operation.run(bundleContext));
 	}
 }

Added: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1771765&view=auto
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java (added)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java Mon Nov 28 16:42:38 2016
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleEvent;
+import org.osgi.util.tracker.BundleTracker;
+import org.osgi.util.tracker.BundleTrackerCustomizer;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class BundleOSGi extends OSGiImpl<Bundle> {
+
+	private final int _stateMask;
+
+	public BundleOSGi(int stateMask) {
+		super(bundleContext -> {
+			Pipe<Tuple<Bundle>, Tuple<Bundle>> added = Pipe.create();
+
+			Consumer<Tuple<Bundle>> addedSource = added.getSource();
+
+			Pipe<Tuple<Bundle>, Tuple<Bundle>> removed = Pipe.create();
+
+			Consumer<Tuple<Bundle>> removedSource = removed.getSource();
+
+			BundleTracker<Tuple<Bundle>> bundleTracker =
+				new BundleTracker<>(
+					bundleContext, stateMask,
+					new BundleTrackerCustomizer<Tuple<Bundle>>() {
+
+						@Override
+						public Tuple<Bundle> addingBundle(
+							Bundle bundle, BundleEvent bundleEvent) {
+
+							Tuple<Bundle> tuple = Tuple.create(bundle);
+
+							addedSource.accept(tuple);
+
+							return tuple;
+						}
+
+						@Override
+						public void modifiedBundle(
+							Bundle bundle, BundleEvent bundleEvent,
+							Tuple<Bundle> tuple) {
+
+							removedBundle(bundle, bundleEvent, tuple);
+
+							addingBundle(bundle, bundleEvent);
+						}
+
+						@Override
+						public void removedBundle(
+							Bundle bundle, BundleEvent bundleEvent,
+							Tuple<Bundle> tuple) {
+
+							removedSource.accept(tuple);
+						}
+					});
+
+			return new OSGiResultImpl<>(
+				added, removed, bundleTracker::open, bundleTracker::close);
+		});
+		_stateMask = stateMask;
+	}
+
+	@Override
+	public <S> OSGiImpl<S> flatMap(Function<? super Bundle, OSGi<? extends S>> fun) {
+		return new OSGiImpl<>(bundleContext -> {
+			Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+			Consumer<Tuple<S>> addedSource = added.getSource();
+
+			Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+			Consumer<Tuple<S>> removedSource = removed.getSource();
+
+			BundleTracker<Tracked<Bundle, S>> bundleTracker =
+				new BundleTracker<>(
+					bundleContext, _stateMask,
+					new BundleTrackerCustomizer<Tracked<Bundle, S>>() {
+
+						@Override
+						public Tracked<Bundle, S> addingBundle(
+							Bundle bundle, BundleEvent bundleEvent) {
+
+							OSGiImpl<S> program = (OSGiImpl<S>) fun.apply(
+								bundle);
+
+							OSGiResultImpl<S> result =
+								program._operation.run(bundleContext);
+
+							Tracked<Bundle, S> tracked = new Tracked<>();
+
+							tracked.service = bundle;
+							tracked.program = result;
+
+							result.added.map(s -> {
+								tracked.result = s;
+
+								addedSource.accept(s);
+
+								return s;
+							});
+
+							result.start.run();
+
+							return tracked;
+						}
+
+						@Override
+						public void modifiedBundle(
+							Bundle bundle, BundleEvent bundleEvent,
+							Tracked<Bundle, S> tracked) {
+
+							removedBundle(bundle, bundleEvent, tracked);
+
+							addingBundle(bundle, bundleEvent);
+						}
+
+						@Override
+						public void removedBundle(
+							Bundle bundle, BundleEvent bundleEvent,
+							Tracked<Bundle, S> tracked) {
+
+							tracked.program.close();
+
+							if (tracked.result != null) {
+								removedSource.accept(tracked.result);
+							}
+						}
+					});
+
+			return new OSGiResultImpl<>(
+				added, removed, bundleTracker::open, bundleTracker::close);
+
+		});
+	}
+
+}

Modified: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java (original)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java Mon Nov 28 16:42:38 2016
@@ -19,24 +19,56 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
 
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * @author Carlos Sierra Andrés
  */
-public class JustOSGiImpl<S> extends OSGiImpl<S> {
+public class JustOSGiImpl<T> extends OSGiImpl<T> {
 
-	public JustOSGiImpl(S s) {
+	private T _t;
+
+	public JustOSGiImpl(T t) {
 		super(((bundleContext) -> {
 
-			Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+			Pipe<Tuple<T>, Tuple<T>> added = Pipe.create();
 
-			Consumer<Tuple<S>> source = added.getSource();
+			Consumer<Tuple<T>> source = added.getSource();
 
 			return new OSGiResultImpl<>(
 				added, Pipe.create(),
-				() -> source.accept(Tuple.create(s)), OSGi.NOOP);
+				() -> source.accept(Tuple.create(t)), OSGi.NOOP);
 		}));
+
+		_t = t;
+	}
+
+	@Override
+	public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
+		return new OSGiImpl<>(bundleContext -> {
+			Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+			Consumer<Tuple<S>> addedSource = added.getSource();
+
+			AtomicReference<OSGiResult<? extends S>> atomicReference =
+				new AtomicReference<>(null);
+
+			return new OSGiResultImpl<>(
+				added, Pipe.create(),
+				() -> {
+					OSGi<? extends S> next = fun.apply(_t);
+
+					atomicReference.set(
+						next.run(
+							bundleContext,
+							s -> addedSource.accept(Tuple.create(s))));
+
+				},
+				() -> atomicReference.get().close());
+		});
 	}
 }

Modified: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Mon Nov 28 16:42:38 2016
@@ -23,11 +23,16 @@ import org.osgi.framework.BundleContext;
 import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.IdentityHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 /**
  * @author Carlos Sierra Andrés
@@ -41,10 +46,11 @@ public class OSGiImpl<T> implements OSGi
 	}
 
 	@Override
-	public <S> OSGiImpl<S> flatMap(Function<T, OSGi<S>> fun) {
+	public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
 		return new OSGiImpl<>(
 			((bundleContext) -> {
-				Map<Object, OSGiResult<S>> identities = new IdentityHashMap<>();
+				Map<Object, OSGiResult<? extends S>> identities =
+					new IdentityHashMap<>();
 
 				AtomicReference<Runnable> closeReference =
 					new AtomicReference<>(NOOP);
@@ -69,9 +75,9 @@ public class OSGiImpl<T> implements OSGi
 					closeReference.set(or1.close);
 
 					or1.added.map(t -> {
-						OSGi<S> program = fun.apply(t.t);
+						OSGi<? extends S> program = fun.apply(t.t);
 
-						OSGiResult<S> or2 = program.run(
+						OSGiResult<? extends S> or2 = program.run(
 							bundleContext,
 							s -> addedSource.accept(Tuple.create(s)));
 
@@ -82,8 +88,8 @@ public class OSGiImpl<T> implements OSGi
 
 					or1.removed.map(t -> {
 						synchronized (identities) {
-							OSGiResult<S> osgiResult1 = identities.remove(
-								t.original);
+							OSGiResult<? extends S> osgiResult1 =
+								identities.remove(t.original);
 
 							if (osgiResult1 != null) {
 								osgiResult1.close();
@@ -102,12 +108,12 @@ public class OSGiImpl<T> implements OSGi
 	}
 
 	@Override
-	public <S> OSGi<Void> foreach(Function<T, OSGi<S>> fun) {
-		return this.flatMap(fun).map(x -> null);
+	public OSGi<Void> foreach(Consumer<? super T> consumer) {
+		return this.map(f ->  {consumer.accept(f); return null;});
 	}
 
 	@Override
-	public <S> OSGi<S> map(Function<T, S> function) {
+	public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
 		return new OSGiImpl<>(((bundleContext) -> {
 			OSGiResultImpl<T> osgiResult = _operation.run(bundleContext);
 
@@ -190,6 +196,43 @@ public class OSGiImpl<T> implements OSGi
 		return stringBuilder.toString();
 	}
 
+	@Override
+	public OSGi<T> filter(Predicate<T> predicate) {
+		return flatMap(t -> {
+			if (predicate.test(t)) {
+				return OSGi.just(t);
+			}
+			else {
+				return OSGi.nothing();
+			}
+		});
+	}
+
+	@Override
+	public OSGi<Void> distribute(Function<T, OSGi<?>>... funs) {
+		return new OSGiImpl<>(bundleContext -> {
+			ArrayList<OSGiResult> results = new ArrayList<>();
+
+			Pipe<Tuple<Void>, Tuple<Void>> added = Pipe.create();
+
+			Consumer<Tuple<Void>> addedSource = added.getSource();
+
+			return new OSGiResultImpl<>(
+				added, Pipe.create(),
+				() -> {
+					List<OSGiResult> results2 = Arrays.stream(funs).
+						map(this::flatMap).
+						map(o -> o.run(bundleContext)).
+						collect(Collectors.toList());
+
+					results.addAll(results2);
+
+					addedSource.accept(Tuple.create(null));
+				},
+				() -> results.stream().forEach(OSGiResult::close)
+			);
+		});
+	}
 }
 
 

Modified: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java (original)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java Mon Nov 28 16:42:38 2016
@@ -39,7 +39,7 @@ class Pipe<I, O> {
 		return i -> pipe.apply(i);
 	}
 
-	<U> Pipe<I, U> map(Function<O, U> fun) {
+	<U> Pipe<I, U> map(Function<? super O, ? extends U> fun) {
 		this.pipe = (Function)this.pipe.andThen(fun);
 
 		return (Pipe<I, U>)this;

Added: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java?rev=1771765&view=auto
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java (added)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java Mon Nov 28 16:42:38 2016
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+import org.osgi.framework.ServiceObjects;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class PrototypesOSGi<T>
+	extends OSGiImpl<ServiceObjects<T>> {
+
+	private final String _filterString;
+
+	private final Class<T> _clazz;
+
+	public PrototypesOSGi(Class<T> clazz, String filterString) {
+		super(bundleContext -> {
+			Pipe<Tuple<ServiceObjects<T>>, Tuple<ServiceObjects<T>>> added =
+				Pipe.create();
+
+			Pipe<Tuple<ServiceObjects<T>>, Tuple<ServiceObjects<T>>>
+				removed = Pipe.create();
+
+			Consumer<Tuple<ServiceObjects<T>>> addedSource =
+				added.getSource();
+
+			Consumer<Tuple<ServiceObjects<T>>> removedSource =
+				removed.getSource();
+
+			ServiceTracker<T, Tuple<ServiceObjects<T>>> serviceTracker =
+				new ServiceTracker<>(
+					bundleContext,
+					OSGiImpl.buildFilter(
+						bundleContext, filterString, clazz),
+					new ServiceTrackerCustomizer
+						<T, Tuple<ServiceObjects<T>>>() {
+
+						@Override
+						public Tuple<ServiceObjects<T>> addingService(
+							ServiceReference<T> reference) {
+
+							ServiceObjects<T> serviceObjects =
+								bundleContext.getServiceObjects(reference);
+
+							Tuple<ServiceObjects<T>> tuple =
+								Tuple.create(serviceObjects);
+
+							addedSource.accept(tuple);
+
+							return tuple;
+						}
+
+						@Override
+						public void modifiedService(
+							ServiceReference<T> reference,
+							Tuple<ServiceObjects<T>> service) {
+
+							removedService(reference, service);
+
+							addingService(reference);
+						}
+
+						@Override
+						public void removedService(
+							ServiceReference<T> reference,
+							Tuple<ServiceObjects<T>> tuple) {
+
+							removedSource.accept(tuple);
+						}
+					});
+
+			return new OSGiResultImpl<>(
+				added, removed, serviceTracker::open,
+				serviceTracker::close);
+		});
+
+		_filterString = filterString;
+		_clazz = clazz;
+	}
+
+	@Override
+	public <S> OSGiImpl<S> flatMap(
+		Function<? super ServiceObjects<T>, OSGi<? extends S>> fun) {
+		return new OSGiImpl<>(bundleContext -> {
+			Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+			Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+			Consumer<Tuple<S>> addedSource = added.getSource();
+
+			Consumer<Tuple<S>> removedSource = removed.getSource();
+
+			ServiceTracker<T, Tracked<ServiceObjects<T>, S>>
+				serviceTracker = new ServiceTracker<>(
+				bundleContext,
+				buildFilter(bundleContext, _filterString, _clazz),
+				new ServiceTrackerCustomizer
+					<T, Tracked<ServiceObjects<T>, S>>() {
+
+					@Override
+					public Tracked<ServiceObjects<T>, S> addingService(
+						ServiceReference<T> reference) {
+
+						ServiceObjects<T> serviceObjects =
+							bundleContext.getServiceObjects(
+								reference);
+
+						OSGi<? extends S> program = fun.apply(serviceObjects);
+
+						Tracked<ServiceObjects<T>, S> tracked =
+							new Tracked<>();
+
+						OSGiResult<? extends S> result = program.run(
+							bundleContext, s -> {
+								Tuple<S> tuple = Tuple.create(s);
+
+								tracked.result = tuple;
+
+								addedSource.accept(tuple);
+							}
+						);
+
+						tracked.program = result;
+						tracked.service = serviceObjects;
+
+						return tracked;
+					}
+
+					@Override
+					public void modifiedService(
+						ServiceReference<T> reference,
+						Tracked<ServiceObjects<T>, S> tracked) {
+
+						removedService(reference, tracked);
+
+						addingService(reference);
+					}
+
+					@Override
+					public void removedService(
+						ServiceReference<T> reference,
+						Tracked<ServiceObjects<T>, S> tracked) {
+
+						tracked.program.close();
+
+						if (tracked.result != null) {
+							removedSource.accept(tracked.result);
+						}
+					}
+				});
+
+			return new OSGiResultImpl<>(
+				added, removed, serviceTracker::open,
+				serviceTracker::close);
+		});
+	}
+
+}

Added: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java?rev=1771765&view=auto
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java (added)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java Mon Nov 28 16:42:38 2016
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+import org.osgi.framework.ServiceObjects;
+import org.osgi.framework.ServiceReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class ServicesOSGi<T> extends OSGiImpl<T> {
+
+	private final String _filterString;
+
+	private final Class<T> _clazz;
+
+	public ServicesOSGi(Class<T> clazz, String filterString) {
+		super(bundleContext -> {
+			Pipe<Tuple<T>, Tuple<T>> added = Pipe.create();
+
+			Pipe<Tuple<T>, Tuple<T>> removed = Pipe.create();
+
+			Consumer<Tuple<T>> addedSource = added.getSource();
+
+			Consumer<Tuple<T>> removedSource = removed.getSource();
+
+			ServiceTracker<T, Tuple<T>> serviceTracker =
+				new ServiceTracker<>(
+					bundleContext,
+					OSGiImpl.buildFilter(
+						bundleContext, filterString, clazz),
+					new ServiceTrackerCustomizer<T, Tuple<T>>() {
+						@Override
+						public Tuple<T> addingService(
+							ServiceReference<T> reference) {
+
+							ServiceObjects<T> serviceObjects =
+								bundleContext.getServiceObjects(reference);
+
+							T service = serviceObjects.getService();
+
+							Tuple<T> tuple = Tuple.create(service);
+
+							addedSource.accept(tuple);
+
+							return tuple;
+						}
+
+						@Override
+						public void modifiedService(
+							ServiceReference<T> reference,
+							Tuple<T> service) {
+
+							removedService(reference, service);
+
+							addingService(reference);
+						}
+
+						@Override
+						public void removedService(
+							ServiceReference<T> reference, Tuple<T> tuple) {
+
+							ServiceObjects<T> serviceObjects =
+								bundleContext.getServiceObjects(reference);
+
+							removedSource.accept(tuple);
+
+							serviceObjects.ungetService(tuple.t);
+						}
+					});
+
+			return new OSGiResultImpl<>(
+				added, removed, serviceTracker::open,
+				serviceTracker::close);
+		});
+
+		_filterString = filterString;
+
+		_clazz = clazz;
+	}
+
+	@Override
+	public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
+		return new OSGiImpl<>(bundleContext -> {
+			Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+			Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+			Consumer<Tuple<S>> addedSource = added.getSource();
+
+			Consumer<Tuple<S>> removedSource = removed.getSource();
+
+			ServiceTracker<T, Tracked<T, S>> serviceTracker =
+				new ServiceTracker<>(
+					bundleContext,
+					buildFilter(
+						bundleContext, _filterString, _clazz),
+					new ServiceTrackerCustomizer<T, Tracked<T, S>>() {
+						@Override
+						public Tracked<T, S> addingService(
+							ServiceReference<T> reference) {
+
+							ServiceObjects<T> serviceObjects =
+								bundleContext.getServiceObjects(
+									reference);
+
+							T service = serviceObjects.getService();
+
+							OSGi<? extends S> program = fun.apply(service);
+
+							Tracked<T, S> tracked = new Tracked<>();
+
+							OSGiResult<? extends S> result = program.run(
+								bundleContext, s -> {
+									Tuple<S> tuple = Tuple.create(s);
+
+									tracked.result = tuple;
+
+									addedSource.accept(tuple);
+								}
+							);
+
+							tracked.service = service;
+							tracked.program = result;
+
+							return tracked;
+						}
+
+						@Override
+						public void modifiedService(
+							ServiceReference<T> reference,
+							Tracked<T, S> tracked) {
+
+							removedService(reference, tracked);
+
+							addingService(reference);
+						}
+
+						@Override
+						public void removedService(
+							ServiceReference<T> reference,
+							Tracked<T, S> tracked) {
+
+							tracked.program.close();
+
+							if (tracked.result != null) {
+								removedSource.accept(tracked.result);
+							}
+
+							ServiceObjects<T> serviceObjects =
+								bundleContext.getServiceObjects(
+									reference);
+
+							serviceObjects.ungetService(
+								tracked.service);
+						}
+					});
+
+			return new OSGiResultImpl<>(
+				added, removed, serviceTracker::open,
+				serviceTracker::close);
+
+		});
+	}
+
+}

Modified: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java (original)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java Mon Nov 28 16:42:38 2016
@@ -25,7 +25,7 @@ import org.apache.aries.osgi.functional.
 class Tracked<T, S> {
 
 	T service = null;
-	OSGiResult<S> program = null;
+	OSGiResult<? extends S> program = null;
 
 	Tuple<S> result = null;
 

Modified: aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java?rev=1771765&r1=1771764&r2=1771765&view=diff
==============================================================================
--- aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java (original)
+++ aries/trunk/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java Mon Nov 28 16:42:38 2016
@@ -33,7 +33,7 @@ class Tuple<T> {
 		this.t = t;
 	}
 
-	public <S> Tuple<S> map(Function<T, S> fun) {
+	public <S> Tuple<S> map(Function<? super T, ? extends S> fun) {
 		return new Tuple<>(original, fun.apply(t));
 	}