You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/10/10 15:52:17 UTC

svn commit: r1811730 - in /aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal: OSGiImpl.java OSGiResultImpl.java RouteOsgiImpl.java Tuple.java

Author: csierra
Date: Tue Oct 10 15:52:17 2017
New Revision: 1811730

URL: http://svn.apache.org/viewvc?rev=1811730&view=rev
Log:
[Component-DSL] Add atomic guards

Make sure that, if an effect has been executed the "counter effect" will
be fired

Modified:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Tue Oct 10 15:52:17 2017
@@ -54,20 +54,23 @@ public class OSGiImpl<T> implements OSGi
 	public OSGi<Void> foreach(
 		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
+		return OSGi.ignore(effects(onAdded, onRemoved));
+	}
+
+	@Override
+	public OSGi<T> effects(
+		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+
 		return new OSGiImpl<>((bundleContext, op) ->
 			_operation.run(
 				bundleContext,
-            	t -> {
-                	t.onTermination(() -> onRemoved.accept(t._t));
-
-                	onAdded.accept(t._t);
-
-					Tuple<Void> tuple = Tuple.create(null);
+				t -> {
+					onAdded.accept(t._t);
 
-					t.addRelatedTuple(tuple);
+					op.accept(t);
 
-					op.accept(tuple);
-            	}));
+					t.onTermination(() -> onRemoved.accept(t._t));
+				}));
 	}
 
 	@Override
@@ -84,16 +87,7 @@ public class OSGiImpl<T> implements OSGi
 	@Override
 	public OSGiResult run(BundleContext bundleContext, Consumer<T> andThen) {
 		OSGiResultImpl osgiResult =
-			_operation.run(
-				bundleContext,
-				t -> {
-					if (!t.isClosed()) {
-						andThen.accept(t._t);
-					}
-					if (t.isClosed()) {
-						t.terminate();
-					}
-				});
+			_operation.run(bundleContext, t -> andThen.accept(t._t));
 
 		osgiResult.start();
 
@@ -233,12 +227,9 @@ public class OSGiImpl<T> implements OSGi
 		Consumer<Tuple<S>> addedSource, Tuple<Function<T, S>> fTuple,
 		Tuple<T> t) {
 
-		S result = fTuple.getContent().apply(t.getContent());
-
-		Tuple<S> tuple = Tuple.create(result);
+		Tuple<S> tuple = t.map(fTuple.getContent());
 
 		fTuple.addRelatedTuple(tuple);
-		t.addRelatedTuple(tuple);
 
 		addedSource.accept(tuple);
 	}

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Tue Oct 10 15:52:17 2017
@@ -19,16 +19,19 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGiResult;
 
-import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author Carlos Sierra Andrés
  */
 public class OSGiResultImpl implements OSGiResult {
 
-	public Runnable start;
-	public Runnable close;
+	private final Runnable start;
+	private final Runnable close;
+	private AtomicBoolean _working = new AtomicBoolean();
+	private AtomicBoolean _closed = new AtomicBoolean();
+	private volatile boolean _started = false;
+
 
 	public OSGiResultImpl(Runnable start, Runnable close) {
 		this.start = start;
@@ -37,12 +40,38 @@ public class OSGiResultImpl implements O
 
 	@Override
 	public void start() {
-		start.run();
+		if (_working.compareAndSet(false, true)) {
+
+			if (!_started && !_closed.get()) {
+				try {
+					start.run();
+
+					_started = true;
+				}
+				catch (Exception e) {
+				}
+			}
+
+			_working.set(false);
+		}
+
 	}
 
 	@Override
 	public void close() {
-		close.run();
+		while (!_working.compareAndSet(false, true)) {
+			Thread.yield();
+		}
+
+		if (_closed.compareAndSet(false, true) && _started) {
+			try {
+				close.run();
+			}
+			catch (Exception e) {
+			}
+		}
+
+		_working.set(false);
 	}
 
 }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java Tue Oct 10 15:52:17 2017
@@ -44,11 +44,11 @@ public class RouteOsgiImpl<T> extends OS
             return new OSGiResultImpl(
                 () -> {
                     router._start.run();
-                    osgiResult.start.run();
+                    osgiResult.start();
                 },
                 () -> {
                     router._close.run();
-                    osgiResult.close.run();
+                    osgiResult.close();
                 });
         });
     }

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java Tue Oct 10 15:52:17 2017
@@ -24,6 +24,8 @@ import org.apache.aries.osgi.functional.
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 /**
@@ -32,25 +34,35 @@ import java.util.function.Function;
 class Tuple<T> implements Event<T> {
 
 	public final T _t;
-	private final Deque<Runnable> _closingHandlers = new LinkedList<>();
+	private final Deque<Runnable> _closingHandlers =
+		new ConcurrentLinkedDeque<>();
 	private final ConcurrentDoublyLinkedList<Tuple<?>> _relatedTuples =
 		new ConcurrentDoublyLinkedList<>();
-	private volatile boolean _closed = false;
+	private AtomicBoolean _closed = new AtomicBoolean();
+	private AtomicBoolean _working = new AtomicBoolean();
 
 	private Tuple(T t) {
 		_t = t;
 	}
 
 	public void addRelatedTuple(Tuple<?> tuple) {
-		if (_closed) {
-			tuple.terminate();
-
-			return;
+		while (!_working.compareAndSet(false, true)) {
+			Thread.yield();
 		}
+		try {
+			if (_closed.get()) {
+                tuple.terminate();
+
+                return;
+            }
 
-		ConcurrentDoublyLinkedList.Node node = _relatedTuples.addLast(tuple);
+			ConcurrentDoublyLinkedList.Node node = _relatedTuples.addLast(tuple);
 
-		tuple.onTermination(node::remove);
+			tuple.onTermination(node::remove);
+		}
+		finally {
+			_working.set(false);
+		}
 	}
 
 	public static <T> Tuple<T> create(T t) {
@@ -73,7 +85,7 @@ class Tuple<T> implements Event<T> {
 	}
 
 	public boolean isClosed() {
-		return _closed;
+		return _closed.get();
 	}
 
 	public <S> Tuple<S> map(Function<? super T, ? extends S> fun) {
@@ -85,17 +97,35 @@ class Tuple<T> implements Event<T> {
 	}
 
 	public void onTermination(Runnable terminator) {
-		if (_closed) {
-			terminator.run();
+		while (!_working.compareAndSet(false, true)) {
+			Thread.yield();
+		}
+		try {
+			if (_closed.get()) {
+                terminator.run();
+
+                return;
+            }
 
-			return;
+			_closingHandlers.push(terminator);
+		}
+		finally {
+			_working.set(false);
 		}
 
-		_closingHandlers.push(terminator);
 	}
 
 	public void terminate() {
-		_closed = true;
+		while (!_working.compareAndSet(false, true)) {
+		}
+		try {
+			if (!_closed.compareAndSet(false, true)) {
+                return;
+            }
+		}
+		finally {
+			_working.set(false);
+		}
 
 		Iterator<Tuple<?>> iterator = _relatedTuples.iterator();
 
@@ -120,6 +150,7 @@ class Tuple<T> implements Event<T> {
 			catch (Exception e) {
 			}
 		}
+
 	}
 
 }