You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/10/10 15:52:17 UTC
svn commit: r1811730 - in
/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal:
OSGiImpl.java OSGiResultImpl.java RouteOsgiImpl.java Tuple.java
Author: csierra
Date: Tue Oct 10 15:52:17 2017
New Revision: 1811730
URL: http://svn.apache.org/viewvc?rev=1811730&view=rev
Log:
[Component-DSL] Add atomic guards
Make sure that, if an effect has been executed the "counter effect" will
be fired
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Tue Oct 10 15:52:17 2017
@@ -54,20 +54,23 @@ public class OSGiImpl<T> implements OSGi
public OSGi<Void> foreach(
Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+ return OSGi.ignore(effects(onAdded, onRemoved));
+ }
+
+ @Override
+ public OSGi<T> effects(
+ Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+
return new OSGiImpl<>((bundleContext, op) ->
_operation.run(
bundleContext,
- t -> {
- t.onTermination(() -> onRemoved.accept(t._t));
-
- onAdded.accept(t._t);
-
- Tuple<Void> tuple = Tuple.create(null);
+ t -> {
+ onAdded.accept(t._t);
- t.addRelatedTuple(tuple);
+ op.accept(t);
- op.accept(tuple);
- }));
+ t.onTermination(() -> onRemoved.accept(t._t));
+ }));
}
@Override
@@ -84,16 +87,7 @@ public class OSGiImpl<T> implements OSGi
@Override
public OSGiResult run(BundleContext bundleContext, Consumer<T> andThen) {
OSGiResultImpl osgiResult =
- _operation.run(
- bundleContext,
- t -> {
- if (!t.isClosed()) {
- andThen.accept(t._t);
- }
- if (t.isClosed()) {
- t.terminate();
- }
- });
+ _operation.run(bundleContext, t -> andThen.accept(t._t));
osgiResult.start();
@@ -233,12 +227,9 @@ public class OSGiImpl<T> implements OSGi
Consumer<Tuple<S>> addedSource, Tuple<Function<T, S>> fTuple,
Tuple<T> t) {
- S result = fTuple.getContent().apply(t.getContent());
-
- Tuple<S> tuple = Tuple.create(result);
+ Tuple<S> tuple = t.map(fTuple.getContent());
fTuple.addRelatedTuple(tuple);
- t.addRelatedTuple(tuple);
addedSource.accept(tuple);
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Tue Oct 10 15:52:17 2017
@@ -19,16 +19,19 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGiResult;
-import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Carlos Sierra Andrés
*/
public class OSGiResultImpl implements OSGiResult {
- public Runnable start;
- public Runnable close;
+ private final Runnable start;
+ private final Runnable close;
+ private AtomicBoolean _working = new AtomicBoolean();
+ private AtomicBoolean _closed = new AtomicBoolean();
+ private volatile boolean _started = false;
+
public OSGiResultImpl(Runnable start, Runnable close) {
this.start = start;
@@ -37,12 +40,38 @@ public class OSGiResultImpl implements O
@Override
public void start() {
- start.run();
+ if (_working.compareAndSet(false, true)) {
+
+ if (!_started && !_closed.get()) {
+ try {
+ start.run();
+
+ _started = true;
+ }
+ catch (Exception e) {
+ }
+ }
+
+ _working.set(false);
+ }
+
}
@Override
public void close() {
- close.run();
+ while (!_working.compareAndSet(false, true)) {
+ Thread.yield();
+ }
+
+ if (_closed.compareAndSet(false, true) && _started) {
+ try {
+ close.run();
+ }
+ catch (Exception e) {
+ }
+ }
+
+ _working.set(false);
}
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java Tue Oct 10 15:52:17 2017
@@ -44,11 +44,11 @@ public class RouteOsgiImpl<T> extends OS
return new OSGiResultImpl(
() -> {
router._start.run();
- osgiResult.start.run();
+ osgiResult.start();
},
() -> {
router._close.run();
- osgiResult.close.run();
+ osgiResult.close();
});
});
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java?rev=1811730&r1=1811729&r2=1811730&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java Tue Oct 10 15:52:17 2017
@@ -24,6 +24,8 @@ import org.apache.aries.osgi.functional.
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
@@ -32,25 +34,35 @@ import java.util.function.Function;
class Tuple<T> implements Event<T> {
public final T _t;
- private final Deque<Runnable> _closingHandlers = new LinkedList<>();
+ private final Deque<Runnable> _closingHandlers =
+ new ConcurrentLinkedDeque<>();
private final ConcurrentDoublyLinkedList<Tuple<?>> _relatedTuples =
new ConcurrentDoublyLinkedList<>();
- private volatile boolean _closed = false;
+ private AtomicBoolean _closed = new AtomicBoolean();
+ private AtomicBoolean _working = new AtomicBoolean();
private Tuple(T t) {
_t = t;
}
public void addRelatedTuple(Tuple<?> tuple) {
- if (_closed) {
- tuple.terminate();
-
- return;
+ while (!_working.compareAndSet(false, true)) {
+ Thread.yield();
}
+ try {
+ if (_closed.get()) {
+ tuple.terminate();
+
+ return;
+ }
- ConcurrentDoublyLinkedList.Node node = _relatedTuples.addLast(tuple);
+ ConcurrentDoublyLinkedList.Node node = _relatedTuples.addLast(tuple);
- tuple.onTermination(node::remove);
+ tuple.onTermination(node::remove);
+ }
+ finally {
+ _working.set(false);
+ }
}
public static <T> Tuple<T> create(T t) {
@@ -73,7 +85,7 @@ class Tuple<T> implements Event<T> {
}
public boolean isClosed() {
- return _closed;
+ return _closed.get();
}
public <S> Tuple<S> map(Function<? super T, ? extends S> fun) {
@@ -85,17 +97,35 @@ class Tuple<T> implements Event<T> {
}
public void onTermination(Runnable terminator) {
- if (_closed) {
- terminator.run();
+ while (!_working.compareAndSet(false, true)) {
+ Thread.yield();
+ }
+ try {
+ if (_closed.get()) {
+ terminator.run();
+
+ return;
+ }
- return;
+ _closingHandlers.push(terminator);
+ }
+ finally {
+ _working.set(false);
}
- _closingHandlers.push(terminator);
}
public void terminate() {
- _closed = true;
+ while (!_working.compareAndSet(false, true)) {
+ }
+ try {
+ if (!_closed.compareAndSet(false, true)) {
+ return;
+ }
+ }
+ finally {
+ _working.set(false);
+ }
Iterator<Tuple<?>> iterator = _relatedTuples.iterator();
@@ -120,6 +150,7 @@ class Tuple<T> implements Event<T> {
catch (Exception e) {
}
}
+
}
}