You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/08/28 18:37:38 UTC

svn commit: r1806482 - /aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/

Author: csierra
Date: Mon Aug 28 18:37:37 2017
New Revision: 1806482

URL: http://svn.apache.org/viewvc?rev=1806482&view=rev
Log:
Abstract flatMap and implement it thread safe

Added:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
Modified:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java Mon Aug 28 18:37:37 2017
@@ -18,6 +18,7 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleEvent;
 import org.osgi.util.tracker.BundleTracker;
@@ -81,11 +82,14 @@ public class BundleOSGi extends OSGiImpl
 			return new OSGiResultImpl<>(
 				added, removed, bundleTracker::open, bundleTracker::close);
 		});
+
 		_stateMask = stateMask;
 	}
 
 	@Override
-	public <S> OSGiImpl<S> flatMap(Function<? super Bundle, OSGi<? extends S>> fun) {
+	public <S> OSGiImpl<S> flatMap(
+		Function<? super Bundle, OSGi<? extends S>> fun) {
+
 		return new OSGiImpl<>(bundleContext -> {
 			Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
 
@@ -95,13 +99,13 @@ public class BundleOSGi extends OSGiImpl
 
 			Consumer<Tuple<S>> removedSource = removed.getSource();
 
-			BundleTracker<Tracked<Bundle, S>> bundleTracker =
+			BundleTracker<OSGiResult<S>> bundleTracker =
 				new BundleTracker<>(
 					bundleContext, _stateMask,
-					new BundleTrackerCustomizer<Tracked<Bundle, S>>() {
+					new BundleTrackerCustomizer<OSGiResult<S>>() {
 
 						@Override
-						public Tracked<Bundle, S> addingBundle(
+						public OSGiResult<S> addingBundle(
 							Bundle bundle, BundleEvent bundleEvent) {
 
 							OSGiImpl<S> program = (OSGiImpl<S>) fun.apply(
@@ -110,30 +114,17 @@ public class BundleOSGi extends OSGiImpl
 							OSGiResultImpl<S> result =
 								program._operation.run(bundleContext);
 
-							Tracked<Bundle, S> tracked = new Tracked<>();
-
-							tracked.service = bundle;
-							tracked.program = result;
-
-							result.added.map(s -> {
-								tracked.result = s;
+							result.pipeTo(addedSource, removedSource);
 
-								addedSource.accept(s);
-
-								return s;
-							});
-
-							result.start.run();
-
-							return tracked;
+							return result;
 						}
 
 						@Override
 						public void modifiedBundle(
 							Bundle bundle, BundleEvent bundleEvent,
-							Tracked<Bundle, S> tracked) {
+							OSGiResult<S> result) {
 
-							removedBundle(bundle, bundleEvent, tracked);
+							removedBundle(bundle, bundleEvent, result);
 
 							addingBundle(bundle, bundleEvent);
 						}
@@ -141,13 +132,9 @@ public class BundleOSGi extends OSGiImpl
 						@Override
 						public void removedBundle(
 							Bundle bundle, BundleEvent bundleEvent,
-							Tracked<Bundle, S> tracked) {
-
-							tracked.program.close();
+							OSGiResult<S> result) {
 
-							if (tracked.result != null) {
-								removedSource.accept(tracked.result);
-							}
+							result.close();
 						}
 					});
 

Added: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java?rev=1806482&view=auto
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java (added)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java Mon Aug 28 18:37:37 2017
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.OSGi;
+import org.apache.aries.osgi.functional.OSGiResult;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class FlatMapImpl<T, S> extends OSGiImpl<S> {
+
+	public FlatMapImpl(
+		OSGiImpl<T> previous, Function<? super T, OSGi<? extends S>> fun) {
+
+		super((bundleContext) -> {
+			Map<IdentityKey<Object>, OSGiResult<?>> identities =
+				new ConcurrentHashMap<>();
+
+			AtomicReference<Runnable> closeReference =
+				new AtomicReference<>(NOOP);
+
+			Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+
+			Consumer<Tuple<S>> addedSource = added.getSource();
+
+			Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+			Consumer<Tuple<S>> removedSource = removed.getSource();
+
+			return new OSGiResultImpl<>(
+				added, removed,
+				() -> {
+					OSGiResultImpl<T> or1 = previous._operation.run(
+						bundleContext);
+
+					closeReference.set(or1.close);
+
+					or1.added.map(t -> {
+						OSGiImpl<S> program = (OSGiImpl<S>)fun.apply(t.t);
+
+						OSGiResultImpl<S> or2 =
+							program._operation.run(bundleContext);
+
+						or2.pipeTo(addedSource, removedSource);
+
+						identities.put(new IdentityKey<>(t.original), or2);
+
+						return null;
+					});
+
+					or1.removed.map(t -> {
+						OSGiResult<?> osgiResult1 = identities.remove(
+							new IdentityKey<>(t.original));
+
+						if (osgiResult1 != null) {
+							osgiResult1.close();
+						}
+
+						return null;
+					});
+
+					or1.start.run();
+				},
+				() -> {
+					identities.values().forEach(OSGiResult::close);
+
+					closeReference.get().run();
+				});
+			}
+		);
+	}
+
+	private static class IdentityKey<T> {
+
+		private final T _instance;
+
+		public IdentityKey(T instance) {
+			_instance = instance;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) return true;
+			if (o == null || getClass() != o.getClass()) return false;
+
+			IdentityKey<?> that = (IdentityKey<?>) o;
+
+			return _instance == that._instance;
+		}
+
+		@Override
+		public int hashCode() {
+			return System.identityHashCode(_instance);
+		}
+
+	}
+
+}
+
+

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Mon Aug 28 18:37:37 2017
@@ -25,7 +25,6 @@ import org.osgi.framework.Filter;
 import org.osgi.framework.InvalidSyntaxException;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
@@ -35,7 +34,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 /**
  * @author Carlos Sierra Andrés
@@ -50,70 +48,7 @@ public class OSGiImpl<T> implements OSGi
 
 	@Override
 	public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
-		return new OSGiImpl<>(
-			((bundleContext) -> {
-				Map<Object, OSGiResult<?>> identities =
-					new IdentityHashMap<>();
-
-				AtomicReference<Runnable> closeReference =
-					new AtomicReference<>(NOOP);
-
-				Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
-
-				Consumer<Tuple<S>> addedSource = added.getSource();
-
-				Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
-
-				Consumer<Tuple<S>> removedSource = removed.getSource();
-
-				return new OSGiResultImpl<>(
-					added, removed,
-					() -> {
-						OSGiResultImpl<T> or1 = _operation.run(bundleContext);
-
-						closeReference.set(or1.close);
-
-						or1.added.map(t -> {
-							OSGiImpl<S> program =
-								(OSGiImpl<S>)fun.apply(t.t);
-
-							OSGiResultImpl<S> or2 =
-								program._operation.run(bundleContext);
-
-							or2.added.map(s -> {addedSource.accept(s); return Tuple.create(null);});
-							or2.removed.map(s -> {removedSource.accept(s); return Tuple.create(null);});
-
-							or2.start.run();
-
-							identities.put(t.original, or2);
-
-							return Tuple.create(null);
-						});
-
-						or1.removed.map(t -> {
-							synchronized (identities) {
-								OSGiResult<?> osgiResult1 =
-									identities.remove(t.original);
-
-								if (osgiResult1 != null) {
-									osgiResult1.close();
-								}
-							}
-
-							return Tuple.create(null);
-						});
-
-						or1.start.run();
-					},
-					() -> {
-						synchronized (identities) {
-							identities.values().forEach(OSGiResult::close);
-						}
-
-						closeReference.get().run();
-					});
-			}
-			));
+		return new FlatMapImpl<>(this, fun);
 	}
 
 	@Override

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Mon Aug 28 18:37:37 2017
@@ -19,6 +19,8 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGiResult;
 
+import java.util.function.Consumer;
+
 /**
  * @author Carlos Sierra Andrés
  */
@@ -44,4 +46,14 @@ public class OSGiResultImpl<T> implement
 		close.run();
 	}
 
+	public void pipeTo(
+		Consumer<Tuple<T>> addedSource, Consumer<Tuple<T>> removedSource) {
+
+		added.map(t -> {addedSource.accept(t); return null;});
+
+		removed.map(t -> {removedSource.accept(t); return null;});
+
+		start.run();
+	}
+
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/PrototypesOSGi.java Mon Aug 28 18:37:37 2017
@@ -129,22 +129,18 @@ public class PrototypesOSGi<T>
 							bundleContext.getServiceObjects(
 								reference);
 
-						OSGi<? extends S> program = fun.apply(serviceObjects);
+						OSGiImpl<S> program = (OSGiImpl<S>)fun.apply(
+							serviceObjects);
 
 						Tracked<ServiceObjects<T>, S> tracked =
 							new Tracked<>();
 
-						OSGiResult<? extends S> result = program.run(
-							bundleContext, s -> {
-								Tuple<S> tuple = Tuple.create(s);
+						OSGiResultImpl<S> result = program._operation.run(
+							bundleContext);
 
-								tracked.result = tuple;
+						result.pipeTo(addedSource, removedSource);
 
-								addedSource.accept(tuple);
-							}
-						);
-
-						tracked.program = result;
+						tracked.result = result;
 						tracked.service = serviceObjects;
 
 						return tracked;
@@ -165,11 +161,7 @@ public class PrototypesOSGi<T>
 						ServiceReference<T> reference,
 						Tracked<ServiceObjects<T>, S> tracked) {
 
-						tracked.program.close();
-
-						if (tracked.result != null) {
-							removedSource.accept(tracked.result);
-						}
+						tracked.result.close();
 					}
 				});
 

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java Mon Aug 28 18:37:37 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceObjects;
 import org.osgi.framework.ServiceReference;
 import org.osgi.util.tracker.ServiceTracker;
 import org.osgi.util.tracker.ServiceTrackerCustomizer;
@@ -28,8 +27,6 @@ import org.osgi.util.tracker.ServiceTrac
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import static org.apache.aries.osgi.functional.internal.OSGiImpl.buildFilter;
-
 /**
  * @author Carlos Sierra Andrés
  */
@@ -82,7 +79,7 @@ public class ServiceReferenceOSGi<T> ext
 
 			Consumer<Tuple<S>> removedSource = removed.getSource();
 
-			ServiceTracker<T, Tracked<T, S>> serviceTracker =
+			ServiceTracker<T, OSGiResult<S>> serviceTracker =
 				new ServiceTracker<>(
 					bundleContext,
 					buildFilter(
@@ -137,7 +134,7 @@ public class ServiceReferenceOSGi<T> ext
 	}
 
 	private static class FlatMapServiceTrackerCustomizer<T, S>
-		implements ServiceTrackerCustomizer<T, Tracked<T, S>> {
+		implements ServiceTrackerCustomizer<T, OSGiResult<S>> {
 		private final Function<? super ServiceReference<T>, OSGi<? extends S>>
 			_fun;
 		private final BundleContext _bundleContext;
@@ -157,27 +154,20 @@ public class ServiceReferenceOSGi<T> ext
 		}
 
 		@Override
-        public Tracked<T, S> addingService(ServiceReference<T> reference) {
-            OSGi<? extends S> program = _fun.apply(reference);
-
-            Tracked<T, S> tracked = new Tracked<>();
+        public OSGiResult<S> addingService(ServiceReference<T> reference) {
+            OSGiImpl<S> program = (OSGiImpl<S>) _fun.apply(reference);
 
-            tracked.program = program.run(
-				_bundleContext, s -> {
-                    Tuple<S> tuple = Tuple.create(s);
+			OSGiResultImpl<S> osgiResult = program._operation.run(
+				_bundleContext);
 
-                    tracked.result = tuple;
+			osgiResult.pipeTo(_addedSource, _removedSource);
 
-                    _addedSource.accept(tuple);
-                }
-            );
-
-            return tracked;
+			return osgiResult;
         }
 
 		@Override
         public void modifiedService(
-        	ServiceReference<T> reference, Tracked<T, S> tracked) {
+        	ServiceReference<T> reference, OSGiResult<S> tracked) {
 
             removedService(reference, tracked);
 
@@ -186,13 +176,9 @@ public class ServiceReferenceOSGi<T> ext
 
 		@Override
         public void removedService(
-            ServiceReference<T> reference, Tracked<T, S> tracked) {
-
-            tracked.program.close();
+            ServiceReference<T> reference, OSGiResult<S> tracked) {
 
-            if (tracked.result != null) {
-                _removedSource.accept(tracked.result);
-            }
+            tracked.close();
         }
 
 	}

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServicesOSGi.java Mon Aug 28 18:37:37 2017
@@ -128,22 +128,18 @@ public class ServicesOSGi<T> extends OSG
 
 							T service = serviceObjects.getService();
 
-							OSGi<? extends S> program = fun.apply(service);
+							OSGiImpl<S> program =
+								(OSGiImpl<S>)fun.apply(service);
 
-							Tracked<T, S> tracked = new Tracked<>();
+							OSGiResultImpl<S> result = program._operation.run(
+								bundleContext);
+
+							result.pipeTo(addedSource, removedSource);
 
-							OSGiResult<? extends S> result = program.run(
-								bundleContext, s -> {
-									Tuple<S> tuple = Tuple.create(s);
-
-									tracked.result = tuple;
-
-									addedSource.accept(tuple);
-								}
-							);
+							Tracked<T, S> tracked = new Tracked<>();
 
+							tracked.result = result;
 							tracked.service = service;
-							tracked.program = result;
 
 							return tracked;
 						}
@@ -163,18 +159,12 @@ public class ServicesOSGi<T> extends OSG
 							ServiceReference<T> reference,
 							Tracked<T, S> tracked) {
 
-							tracked.program.close();
-
-							if (tracked.result != null) {
-								removedSource.accept(tracked.result);
-							}
+							tracked.result.close();
 
 							ServiceObjects<T> serviceObjects =
-								bundleContext.getServiceObjects(
-									reference);
+								bundleContext.getServiceObjects(reference);
 
-							serviceObjects.ungetService(
-								tracked.service);
+							serviceObjects.ungetService(tracked.service);
 						}
 					});
 

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java?rev=1806482&r1=1806481&r2=1806482&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tracked.java Mon Aug 28 18:37:37 2017
@@ -19,14 +19,14 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGiResult;
 
+
 /**
  * @author Carlos Sierra Andrés
  */
 class Tracked<T, S> {
 
 	T service = null;
-	OSGiResult<? extends S> program = null;
 
-	Tuple<S> result = null;
+	OSGiResult<? extends S> result = null;
 
 }