You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/08/28 18:37:29 UTC

svn commit: r1806481 - /aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java

Author: csierra
Date: Mon Aug 28 18:37:29 2017
New Revision: 1806481

URL: http://svn.apache.org/viewvc?rev=1806481&view=rev
Log:
Optimize flatMap to use ServiceTracker

Modified:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1806481&r1=1806480&r2=1806481&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java Mon Aug 28 18:37:29 2017
@@ -17,16 +17,26 @@
 
 package org.apache.aries.osgi.functional.internal;
 
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceObjects;
 import org.osgi.framework.ServiceReference;
 import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
 
 import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.apache.aries.osgi.functional.internal.OSGiImpl.buildFilter;
 
 /**
  * @author Carlos Sierra Andrés
  */
-public class ServiceReferenceOSGi<T>
-	extends OSGiImpl<ServiceReference<T>> {
+public class ServiceReferenceOSGi<T> extends OSGiImpl<ServiceReference<T>> {
+
+	private String _filterString;
+	private Class<T> _clazz;
 
 	public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
 		super(bundleContext -> {
@@ -43,33 +53,42 @@ public class ServiceReferenceOSGi<T>
 				removed.getSource();
 
 			ServiceTracker<T, Tuple<ServiceReference<T>>> serviceTracker =
-				new ServiceTracker<T, Tuple<ServiceReference<T>>>(
+				new ServiceTracker<>(
 					bundleContext,
-					OSGiImpl.buildFilter(
-						bundleContext, filterString, clazz), null) {
+					buildFilter(bundleContext, filterString, clazz),
+					new DefaultServiceTrackerCustomizer<>(
+						addedSource, removedSource));
 
-					@Override
-					public Tuple<ServiceReference<T>> addingService(
-						ServiceReference<T> reference) {
+			return new OSGiResultImpl<>(
+				added, removed, serviceTracker::open,
+				serviceTracker::close);
 
-						Tuple<ServiceReference<T>> tuple = Tuple.create(
-							reference);
+		});
 
-						addedSource.accept(tuple);
+		_filterString = filterString;
+		_clazz = clazz;
+	}
+
+	@Override
+	public <S> OSGiImpl<S> flatMap(
+		Function<? super ServiceReference<T>, OSGi<? extends S>> fun) {
 
-						return tuple;
-					}
+		return new OSGiImpl<>(bundleContext -> {
+			Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
 
-					@Override
-					public void removedService(
-						ServiceReference<T> reference,
-						Tuple<ServiceReference<T>> t) {
+			Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
 
-						super.removedService(reference, t);
+			Consumer<Tuple<S>> addedSource = added.getSource();
 
-						removedSource.accept(t);
-					}
-				};
+			Consumer<Tuple<S>> removedSource = removed.getSource();
+
+			ServiceTracker<T, Tracked<T, S>> serviceTracker =
+				new ServiceTracker<>(
+					bundleContext,
+					buildFilter(
+						bundleContext, _filterString, _clazz),
+						new FlatMapServiceTrackerCustomizer<>(
+							fun, bundleContext, addedSource, removedSource));
 
 			return new OSGiResultImpl<>(
 				added, removed, serviceTracker::open,
@@ -78,4 +97,104 @@ public class ServiceReferenceOSGi<T>
 		});
 	}
 
+	private static class DefaultServiceTrackerCustomizer<T>
+		implements ServiceTrackerCustomizer<T, Tuple<ServiceReference<T>>> {
+
+		private final Consumer<Tuple<ServiceReference<T>>> _addedSource;
+		private final Consumer<Tuple<ServiceReference<T>>> _removedSource;
+
+		public DefaultServiceTrackerCustomizer(
+			Consumer<Tuple<ServiceReference<T>>> addedSource,
+			Consumer<Tuple<ServiceReference<T>>> removedSource) {
+
+			_addedSource = addedSource;
+			_removedSource = removedSource;
+		}
+
+		@Override
+		public Tuple<ServiceReference<T>> addingService(
+			ServiceReference<T> reference) {
+
+			Tuple<ServiceReference<T>> tuple = Tuple.create(reference);
+
+			_addedSource.accept(tuple);
+
+			return tuple;
+		}
+
+		@Override
+		public void modifiedService(
+			ServiceReference<T> reference, Tuple<ServiceReference<T>> service) {
+
+		}
+
+		@Override
+		public void removedService(
+			ServiceReference<T> reference, Tuple<ServiceReference<T>> tuple) {
+
+			_removedSource.accept(tuple);
+		}
+	}
+
+	private static class FlatMapServiceTrackerCustomizer<T, S>
+		implements ServiceTrackerCustomizer<T, Tracked<T, S>> {
+		private final Function<? super ServiceReference<T>, OSGi<? extends S>>
+			_fun;
+		private final BundleContext _bundleContext;
+		private final Consumer<Tuple<S>> _addedSource;
+
+		private final Consumer<Tuple<S>> _removedSource;
+
+		public FlatMapServiceTrackerCustomizer(
+			Function<? super ServiceReference<T>, OSGi<? extends S>> fun,
+			BundleContext bundleContext, Consumer<Tuple<S>> addedSource,
+			Consumer<Tuple<S>> removedSource) {
+
+			_fun = fun;
+			_bundleContext = bundleContext;
+			_addedSource = addedSource;
+			_removedSource = removedSource;
+		}
+
+		@Override
+        public Tracked<T, S> addingService(ServiceReference<T> reference) {
+            OSGi<? extends S> program = _fun.apply(reference);
+
+            Tracked<T, S> tracked = new Tracked<>();
+
+            tracked.program = program.run(
+				_bundleContext, s -> {
+                    Tuple<S> tuple = Tuple.create(s);
+
+                    tracked.result = tuple;
+
+                    _addedSource.accept(tuple);
+                }
+            );
+
+            return tracked;
+        }
+
+		@Override
+        public void modifiedService(
+        	ServiceReference<T> reference, Tracked<T, S> tracked) {
+
+            removedService(reference, tracked);
+
+            addingService(reference);
+        }
+
+		@Override
+        public void removedService(
+            ServiceReference<T> reference, Tracked<T, S> tracked) {
+
+            tracked.program.close();
+
+            if (tracked.result != null) {
+                _removedSource.accept(tracked.result);
+            }
+        }
+
+	}
+
 }