You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/11/17 16:00:41 UTC
svn commit: r1815581 - in /aries/trunk/component-dsl:
component-dsl/src/main/java/org/apache/aries/osgi/functional/
component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/
itests/src/main/java/org/apache/aries/osgi/functional/internal/ i...
Author: csierra
Date: Fri Nov 17 16:00:41 2017
New Revision: 1815581
URL: http://svn.apache.org/viewvc?rev=1815581&view=rev
Log:
[Component-DSL] Refactor and source formatting
Added:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java
- copied, changed from r1815580, aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java
- copied, changed from r1815580, aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
Removed:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Event.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/HighestsPerTransformer.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java Fri Nov 17 16:00:41 2017
@@ -31,9 +31,6 @@ import java.util.stream.Collectors;
public class CachingServiceReference<T>
implements Comparable<CachingServiceReference<T>> {
- private final ConcurrentHashMap<String, Object> _properties;
- private final ServiceReference<T> _serviceReference;
-
public CachingServiceReference(ServiceReference<T> serviceReference) {
_properties = new ConcurrentHashMap<>();
_serviceReference = serviceReference;
@@ -140,12 +137,6 @@ public class CachingServiceReference<T>
*/
public ServiceReference<T> getServiceReference() {
return _serviceReference;
- } @Override
- public String toString() {
- return "CachingServiceReference{" +
- "cachedProperties=" + _properties + ", " +
- "serviceReference=" + _serviceReference +
- '}';
}
@Override
@@ -162,7 +153,6 @@ public class CachingServiceReference<T>
return _serviceReference.equals(that._serviceReference);
}
-
/**
* Checks if any of the cached properties has a different value in the
* underlying {@link ServiceReference}. Only properties that have been
@@ -180,6 +170,14 @@ public class CachingServiceReference<T>
);
}
+ @Override
+ public String toString() {
+ return "CachingServiceReference{" +
+ "cachedProperties=" + _properties + ", " +
+ "serviceReference=" + _serviceReference +
+ '}';
+ }
+
/**
* Checks if the property is dirty in this instance without caching the
* value. Trying to do the same using getProperty would cache the property
@@ -194,6 +192,9 @@ public class CachingServiceReference<T>
!value.equals(_serviceReference.getProperty(key));
}
+ private final ConcurrentHashMap<String, Object> _properties;
+ private final ServiceReference<T> _serviceReference;
+
private static class NULL {
private static NULL INSTANCE = new NULL();
@@ -208,6 +209,4 @@ public class CachingServiceReference<T>
}
}
-
-
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java Fri Nov 17 16:00:41 2017
@@ -59,42 +59,12 @@ import java.util.function.Supplier;
public interface OSGi<T> extends OSGiRunnable<T> {
Runnable NOOP = () -> {};
- <S> OSGi<S> choose(
- Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
- Function<OSGi<T>, OSGi<S>> otherwise);
-
- <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
-
- <K, S> OSGi<S> splitBy(
- Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
-
- OSGi<T> recover(BiFunction<T, Exception, T> onError);
-
- OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
-
- OSGi<T> effects(
- Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
-
- <S> OSGi<S> map(Function<? super T, ? extends S> function);
-
- <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
-
- <S> OSGi<S> then(OSGi<S> next);
-
- OSGi<Void> foreach(Consumer<? super T> onAdded);
-
- OSGi<Void> foreach(
- Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
-
- <S> OSGi<S> transform(
- Function<Function<S, Runnable>, Function<T, Runnable>> fun);
-
- static OSGi<Void> ignore(OSGi<?> program) {
- return new IgnoreImpl(program);
+ @SafeVarargs
+ static <T> OSGi<T> all(OSGi<T> ... programs) {
+ return new AllOSGi<>(programs);
}
static OSGi<BundleContext> bundleContext() {
-
return new BundleContextOSGiImpl();
}
@@ -108,6 +78,26 @@ public interface OSGi<T> extends OSGiRun
return new ChangeContextOSGiImpl<>(program, bundleContext);
}
+ static <A, B, C> OSGi<C> combine(Function2<A, B, C> fun, OSGi<A> a, OSGi<B> b) {
+ return b.applyTo(a.applyTo(just(fun.curried())));
+ }
+
+ static <A, B, C, D> OSGi<D> combine(Function3<A, B, C, D> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c) {
+ return c.applyTo(OSGi.combine((A aa, B bb) -> fun.curried().apply(aa).apply(bb), a, b));
+ }
+
+ static <A, B, C, D, E> OSGi<E> combine(Function4<A, B, C, D, E> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d) {
+ return d.applyTo(OSGi.combine((A aa, B bb, C cc) -> fun.curried().apply(aa).apply(bb).apply(cc), a, b, c));
+ }
+
+ static <A, B, C, D, E, F> OSGi<F> combine(Function5<A, B, C, D, E, F> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e) {
+ return e.applyTo(OSGi.combine((A aa, B bb, C cc, D dd) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd), a, b, c, d));
+ }
+
+ static <A, B, C, D, E, F, G> OSGi<G> combine(Function6<A, B, C, D, E, F, G> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e, OSGi<F> f) {
+ return f.applyTo(OSGi.combine((A aa, B bb, C cc, D dd, E ee) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd).apply(ee), a, b, c, d, e));
+ }
+
static OSGi<Dictionary<String, ?>> configuration(String pid) {
return new ConfigurationOSGiImpl(pid);
}
@@ -116,6 +106,14 @@ public interface OSGi<T> extends OSGiRun
return new ConfigurationsOSGiImpl(factoryPid);
}
+ static OSGi<Void> ignore(OSGi<?> program) {
+ return new IgnoreImpl(program);
+ }
+
+ static <S> OSGi<S> join(OSGi<OSGi<S>> program) {
+ return program.flatMap(x -> x);
+ }
+
static <S> OSGi<S> just(S s) {
return new JustOSGiImpl<>(s);
}
@@ -128,10 +126,6 @@ public interface OSGi<T> extends OSGiRun
return new JustOSGiImpl<>(() -> Collections.singletonList(s.get()));
}
- static <S> OSGi<S> join(OSGi<OSGi<S>> program) {
- return program.flatMap(x -> x);
- }
-
static <S> OSGi<S> nothing() {
return new NothingOSGiImpl<>();
}
@@ -140,6 +134,28 @@ public interface OSGi<T> extends OSGiRun
return new OnCloseOSGiImpl(action);
}
+ static <T> OSGi<T> once(OSGi<T> program) {
+ return program.transform(op -> {
+ AtomicInteger count = new AtomicInteger();
+
+ AtomicReference<Runnable> terminator = new AtomicReference<>();
+
+ return t -> {
+ if (count.getAndIncrement() == 0) {
+ terminator.set(op.apply(t));
+ }
+
+ return () -> {
+ if (count.decrementAndGet() == 0) {
+ Runnable runnable = terminator.getAndSet(NOOP);
+
+ runnable.run();
+ }
+ };
+ };
+ });
+ }
+
static OSGi<ServiceObjects<Object>> prototypes(String filterString) {
return prototypes(null, filterString);
}
@@ -180,55 +196,6 @@ public interface OSGi<T> extends OSGiRun
return new ServiceRegistrationOSGiImpl(classes, service, properties);
}
- static <T> OSGi<T> services(Class<T> clazz) {
- return services(clazz, null);
- }
-
- static <T> OSGi<Object> services(String filterString) {
- return services(null, filterString);
- }
-
- static <T> OSGi<T> services(Class<T> clazz, String filterString) {
- return
- bundleContext().flatMap(
- bundleContext ->
-
- serviceReferences(clazz, filterString).map(
- CachingServiceReference::getServiceReference
- ).flatMap(
- sr -> {
- T service = bundleContext.getService(sr);
-
- return
- onClose(() -> bundleContext.ungetService(sr)).then(
- just(service)
- );
- }
- ));
- }
-
- public static <T> OSGi<T> once(OSGi<T> program) {
- return program.transform(op -> {
- AtomicInteger count = new AtomicInteger();
-
- AtomicReference<Runnable> terminator = new AtomicReference<>();
-
- return t -> {
- if (count.getAndIncrement() == 0) {
- terminator.set(op.apply(t));
- }
-
- return () -> {
- if (count.decrementAndGet() == 0) {
- Runnable runnable = terminator.getAndSet(NOOP);
-
- runnable.run();
- }
- };
- };
- });
- }
-
static <T> OSGi<CachingServiceReference<T>> serviceReferences(
Class<T> clazz) {
@@ -267,35 +234,66 @@ public interface OSGi<T> extends OSGiRun
return new ServiceReferenceOSGi<>(filterString, null, onModified);
}
- @SafeVarargs
- static <T> OSGi<T> all(OSGi<T> ... programs) {
- return new AllOSGi<>(programs);
+ static <T> OSGi<T> services(Class<T> clazz) {
+ return services(clazz, null);
}
- OSGi<T> filter(Predicate<T> predicate);
-
- public default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
- return fun.flatMap(this::map);
+ static <T> OSGi<Object> services(String filterString) {
+ return services(null, filterString);
}
- public static <A, B, C> OSGi<C> combine(Function2<A, B, C> fun, OSGi<A> a, OSGi<B> b) {
- return b.applyTo(a.applyTo(just(fun.curried())));
- }
+ static <T> OSGi<T> services(Class<T> clazz, String filterString) {
+ return
+ bundleContext().flatMap(
+ bundleContext ->
- public static <A, B, C, D> OSGi<D> combine(Function3<A, B, C, D> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c) {
- return c.applyTo(OSGi.combine((A aa, B bb) -> fun.curried().apply(aa).apply(bb), a, b));
- }
+ serviceReferences(clazz, filterString).map(
+ CachingServiceReference::getServiceReference
+ ).flatMap(
+ sr -> {
+ T service = bundleContext.getService(sr);
- public static <A, B, C, D, E> OSGi<E> combine(Function4<A, B, C, D, E> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d) {
- return d.applyTo(OSGi.combine((A aa, B bb, C cc) -> fun.curried().apply(aa).apply(bb).apply(cc), a, b, c));
+ return
+ onClose(() -> bundleContext.ungetService(sr)).then(
+ just(service)
+ );
+ }
+ ));
}
- public static <A, B, C, D, E, F> OSGi<F> combine(Function5<A, B, C, D, E, F> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e) {
- return e.applyTo(OSGi.combine((A aa, B bb, C cc, D dd) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd), a, b, c, d));
+ default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+ return fun.flatMap(this::map);
}
- public static <A, B, C, D, E, F, G> OSGi<G> combine(Function6<A, B, C, D, E, F, G> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e, OSGi<F> f) {
- return f.applyTo(OSGi.combine((A aa, B bb, C cc, D dd, E ee) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd).apply(ee), a, b, c, d, e));
- }
+ <S> OSGi<S> choose(
+ Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
+ Function<OSGi<T>, OSGi<S>> otherwise);
+
+ <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
+
+ OSGi<T> effects(
+ Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
+
+ OSGi<T> filter(Predicate<T> predicate);
+
+ <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
+
+ OSGi<Void> foreach(Consumer<? super T> onAdded);
+
+ OSGi<Void> foreach(
+ Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
+
+ <S> OSGi<S> map(Function<? super T, ? extends S> function);
+
+ OSGi<T> recover(BiFunction<T, Exception, T> onError);
+
+ OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
+
+ <K, S> OSGi<S> splitBy(
+ Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
+
+ <S> OSGi<S> then(OSGi<S> next);
+
+ <S> OSGi<S> transform(Transformer<T, S> fun);
}
Copied: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java (from r1815580, aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java)
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java&p1=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java&r1=1815580&r2=1815581&rev=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java Fri Nov 17 16:00:41 2017
@@ -17,12 +17,17 @@
package org.apache.aries.osgi.functional;
+import java.util.function.Function;
+
/**
* @author Carlos Sierra Andrés
*/
-public interface SentEvent<T> {
+public interface Publisher<T> extends Function<T, Runnable> {
+
+ default Runnable apply(T t) {
+ return publish(t);
+ }
- Event<T> getEvent();
+ Runnable publish(T t);
- void terminate();
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java Fri Nov 17 16:00:41 2017
@@ -6,5 +6,13 @@ import java.util.function.Function;
* @author Carlos Sierra Andrés
*/
public interface Transformer<T, R> extends
- Function<Function<R, Runnable>, Function<T, Runnable>> {
+ Function<Publisher<R>, Publisher<T>> {
+
+ @Override
+ default Publisher<T> apply(Publisher<R> pipe) {
+ return transform(pipe);
+ }
+
+ Publisher<T> transform(Publisher<R> pipe);
+
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java Fri Nov 17 16:00:41 2017
@@ -1,24 +1,19 @@
package org.apache.aries.osgi.functional;
-import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList;
+import org.apache.aries.osgi.functional.internal.AccumulateTransformer;
import org.apache.aries.osgi.functional.internal.HighestRankingOSGi;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiPredicate;
import java.util.function.Function;
-import static org.apache.aries.osgi.functional.OSGi.NOOP;
-
/**
* @author Carlos Sierra Andrés
*/
public interface Utils {
- static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
- return highest(program, Comparator.naturalOrder());
+ static <T> OSGi<List<T>> accumulate(OSGi<T> program) {
+ return program.transform(new AccumulateTransformer<>());
}
static <T> OSGi<T> highest(
@@ -28,58 +23,14 @@ public interface Utils {
}
static <T> OSGi<T> highest(
- OSGi<T> program, Comparator<? super T> comparator, Function<OSGi<T>, OSGi<T>> notHighest) {
+ OSGi<T> program, Comparator<? super T> comparator,
+ Function<OSGi<T>, OSGi<T>> notHighest) {
return new HighestRankingOSGi<>(program, comparator, notHighest);
}
- static <T> OSGi<List<T>> accumulate(OSGi<T> program) {
- return program.transform(op -> {
- ConcurrentDoublyLinkedList<T> list =
- new ConcurrentDoublyLinkedList<>();
-
- AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
-
- return t -> {
- ConcurrentDoublyLinkedList.Node node = list.addLast(t);
-
- publish(op, list, terminator);
-
- return () -> {
- node.remove();
-
- publish(op, list, terminator);
- };
- };
- });
- }
-
- static <T> void publish(Function<List<T>, Runnable> op, ConcurrentDoublyLinkedList<T> list, AtomicReference<Runnable> terminator) {
- Runnable runnable = terminator.get();
-
- runnable.run();
-
- terminator.set(op.apply(new ArrayList<>(list)));
- }
-
- static <T> OSGi<T> republishIf(
- BiPredicate<T, T> refresher, OSGi<T> program) {
-
- return program.transform(op -> {
- AtomicReference<T> old = new AtomicReference<>();
- AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
-
- return t -> {
- if (refresher.test(old.get(), t)) {
- terminator.get().run();
-
- old.set(t);
- terminator.set(op.apply(t));
- }
-
- return () -> {};
- };
- });
+ static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
+ return highest(program, Comparator.naturalOrder());
}
}
Added: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java?rev=1815581&view=auto
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java (added)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java Fri Nov 17 16:00:41 2017
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.Publisher;
+import org.apache.aries.osgi.functional.Transformer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static org.apache.aries.osgi.functional.OSGi.NOOP;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class AccumulateTransformer<T> implements Transformer<T, List<T>> {
+
+ @Override
+ public Publisher<T> transform(Publisher<List<T>> op) {
+ ConcurrentDoublyLinkedList<T> list =
+ new ConcurrentDoublyLinkedList<>();
+
+ AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
+
+ return t -> {
+ ConcurrentDoublyLinkedList.Node node = list.addLast(t);
+
+ publish(op, list, terminator);
+
+ return () -> {
+ node.remove();
+
+ publish(op, list, terminator);
+ };
+ };
+ }
+
+ private static <T> void publish(
+ Function<List<T>, Runnable> op, ConcurrentDoublyLinkedList<T> list,
+ AtomicReference<Runnable> terminator) {
+
+ Runnable runnable = terminator.get();
+
+ runnable.run();
+
+ terminator.set(op.apply(new ArrayList<>(list)));
+ }
+
+}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java Fri Nov 17 16:00:41 2017
@@ -17,23 +17,16 @@
package org.apache.aries.osgi.functional.internal;
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleEvent;
import org.osgi.util.tracker.BundleTracker;
import org.osgi.util.tracker.BundleTrackerCustomizer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
/**
* @author Carlos Sierra Andrés
*/
public class BundleOSGi extends OSGiImpl<Bundle> {
- private final int _stateMask;
-
public BundleOSGi(int stateMask) {
super((bundleContext, op) -> {
BundleTracker<Runnable> bundleTracker =
@@ -68,54 +61,6 @@ public class BundleOSGi extends OSGiImpl
bundleTracker::open, bundleTracker::close);
});
- _stateMask = stateMask;
- }
-
- @Override
- public <S> OSGiImpl<S> flatMap(
- Function<? super Bundle, OSGi<? extends S>> fun) {
-
- return new OSGiImpl<>((bundleContext, op) -> {
- BundleTracker<OSGiResult> bundleTracker =
- new BundleTracker<>(
- bundleContext, _stateMask,
- new BundleTrackerCustomizer<OSGiResult>() {
-
- @Override
- public OSGiResult addingBundle(
- Bundle bundle, BundleEvent bundleEvent) {
-
- OSGiImpl<S> program = (OSGiImpl<S>) fun.apply(
- bundle);
-
- OSGiResultImpl result = program._operation.run(
- bundleContext, op);
-
- result.start();
-
- return result;
- }
-
- @Override
- public void modifiedBundle(
- Bundle bundle, BundleEvent bundleEvent,
- OSGiResult result) {
-
- }
-
- @Override
- public void removedBundle(
- Bundle bundle, BundleEvent bundleEvent,
- OSGiResult result) {
-
- result.close();
- }
- });
-
- return new OSGiResultImpl(
- bundleTracker::open, bundleTracker::close);
-
- });
}
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java Fri Nov 17 16:00:41 2017
@@ -28,8 +28,7 @@ import java.util.concurrent.atomic.Atomi
/**
* @author Carlos Sierra Andrés
*/
-public class ConfigurationOSGiImpl
- extends OSGiImpl<Dictionary<String, ?>> {
+public class ConfigurationOSGiImpl extends OSGiImpl<Dictionary<String, ?>> {
public ConfigurationOSGiImpl(String pid) {
super((bundleContext, op) -> {
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java Fri Nov 17 16:00:41 2017
@@ -40,10 +40,10 @@ public class HighestRankingOSGi<T> exten
comparing.reversed());
AtomicReference<Tuple<T>> sent = new AtomicReference<>();
- Function<T, Runnable> notHighestPipe = ProbeImpl.getProbePipe(
- notHighest, bundleContext, __ -> () -> {});
+ Pad<T, T> notHighestPad = new Pad<>(
+ bundleContext, notHighest, __ -> NOOP);
- return ((OSGiImpl<T>)previous)._operation.run(
+ OSGiResultImpl result = ((OSGiImpl<T>) previous)._operation.run(
bundleContext,
t -> {
Tuple<T> tuple = new Tuple<>(t);
@@ -57,15 +57,14 @@ public class HighestRankingOSGi<T> exten
if (old != null) {
old._runnable.run();
- old._runnable = notHighestPipe.apply(old._t);
+ old._runnable = notHighestPad.publish(old._t);
}
tuple._runnable = highestPipe.apply(t);
sent.set(tuple);
- }
- else {
- tuple._runnable = notHighestPipe.apply(t);
+ } else {
+ tuple._runnable = notHighestPad.publish(t);
}
}
@@ -91,14 +90,19 @@ public class HighestRankingOSGi<T> exten
}
};
});
+
+ return new OSGiResultImpl(
+ result::start,
+ () -> {
+ result.close();
+
+ notHighestPad.close();
+ });
});
}
private static class Tuple<T> {
- T _t;
- Runnable _runnable;
-
Tuple(T t) {
_t = t;
}
@@ -106,6 +110,8 @@ public class HighestRankingOSGi<T> exten
public T getT() {
return _t;
}
+ T _t;
+ Runnable _runnable;
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java Fri Nov 17 16:00:41 2017
@@ -27,8 +27,7 @@ public class IgnoreImpl extends OSGiImpl
public IgnoreImpl(OSGi<?> program) {
super((bundleContext, op) ->
- ((OSGiImpl<?>) program)._operation.run(
- bundleContext, t -> () -> {}));
+ ((OSGiImpl<?>) program)._operation.run(bundleContext, t -> NOOP));
}
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Fri Nov 17 16:00:41 2017
@@ -19,7 +19,8 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.Utils;
+import org.apache.aries.osgi.functional.Publisher;
+import org.apache.aries.osgi.functional.Transformer;
import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList.Node;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Filter;
@@ -47,27 +48,76 @@ public class OSGiImpl<T> implements OSGi
}
@Override
- public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
- return new FlatMapImpl<>(this, fun);
- }
+ public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+ return new OSGiImpl<>(
+ (bundleContext, op) -> {
+ AtomicReference<OSGiResult> myCloseReference =
+ new AtomicReference<>();
- @Override
- public OSGi<Void> foreach(Consumer<? super T> onAdded) {
- return foreach(onAdded, ign -> {});
- }
+ AtomicReference<OSGiResult> otherCloseReference =
+ new AtomicReference<>();
- @Override
- public OSGi<Void> foreach(
- Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+ ConcurrentDoublyLinkedList<T> identities =
+ new ConcurrentDoublyLinkedList<>();
- return OSGi.ignore(effects(onAdded, onRemoved));
- }
+ ConcurrentDoublyLinkedList<Function<T, S>> funs =
+ new ConcurrentDoublyLinkedList<>();
- @Override
- public <S> OSGi<S> transform(
- Function<Function<S, Runnable>, Function<T, Runnable>> fun) {
+ return new OSGiResultImpl(
+ () -> {
+ OSGiResultImpl or1 = _operation.run(
+ bundleContext,
+ t -> {
+ Node node = identities.addLast(t);
- return new TransformerOSGi<>(this, fun);
+ List<Runnable> terminators = funs.stream().map(
+ f -> op.apply(f.apply(t))
+ ).collect(
+ Collectors.toList()
+ );
+
+ return () -> {
+ node.remove();
+
+ terminators.forEach(Runnable::run);
+ };
+ }
+ );
+
+ myCloseReference.set(or1);
+
+ OSGiResultImpl funRun =
+ ((OSGiImpl<Function<T, S>>) fun)._operation.run(
+ bundleContext,
+ f -> {
+ Node node = funs.addLast(f);
+
+ List<Runnable> terminators =
+ identities.stream().map(
+ t -> op.apply(f.apply(t))
+ ).collect(
+ Collectors.toList()
+ );
+
+ return () -> {
+ node.remove();
+
+ terminators.forEach(Runnable::run);
+ };
+ });
+
+ otherCloseReference.set(funRun);
+
+ or1.start();
+
+ funRun.start();
+ },
+ () -> {
+ myCloseReference.get().close();
+
+ otherCloseReference.get().close();
+ });
+ });
}
@Override
@@ -76,20 +126,25 @@ public class OSGiImpl<T> implements OSGi
Function<OSGi<T>, OSGi<S>> otherwise) {
return new OSGiImpl<>((bundleContext, publisher) -> {
- Function<T, Runnable> thenPipe = ProbeImpl.getProbePipe(then, bundleContext, publisher);
-
- Function<T, Runnable> elsePipe = ProbeImpl.getProbePipe(otherwise, bundleContext, publisher);
+ Pad<T, S> thenPad = new Pad<>(bundleContext, then, publisher);
+ Pad<T, S> elsePad = new Pad<>(bundleContext, otherwise, publisher);
- return _operation.run(
+ OSGiResultImpl result = _operation.run(
bundleContext,
t -> {
if (chooser.test(t)) {
- return thenPipe.apply(t);
- }
- else {
- return elsePipe.apply(t);
+ return thenPad.publish(t);
+ } else {
+ return elsePad.publish(t);
}
});
+ return new OSGiResultImpl(
+ result::start,
+ () -> {
+ thenPad.close();
+ elsePad.close();
+ result.close();
+ });
});
}
@@ -97,60 +152,102 @@ public class OSGiImpl<T> implements OSGi
@SafeVarargs
public final <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs) {
return new OSGiImpl<>((bundleContext, publisher) -> {
- List<Function<T, Runnable>> pipes =
+ List<Pad<T, S>> pads =
Arrays.stream(
funs
).map(
- fun -> ProbeImpl.getProbePipe(fun, bundleContext, publisher)
+ fun -> new Pad<>(bundleContext, fun, publisher)
).collect(
Collectors.toList()
);
- return _operation.run(
+ OSGiResultImpl result = _operation.run(
bundleContext,
t -> {
List<Runnable> terminators =
- pipes.stream().map(p -> p.apply(t)).collect(
+ pads.stream().map(p -> p.publish(t)).collect(
Collectors.toList());
return () -> {
terminators.forEach(Runnable::run);
};
});
+
+ return new OSGiResultImpl(
+ result::start,
+ () -> {
+ result.close();
+
+ pads.forEach(Pad::close);
+ });
});
}
@Override
- public <K, S> OSGi<S> splitBy(
- Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
-
- return new OSGiImpl<>((bundleContext, op) -> {
- HashMap<K, Function<T, Runnable>> pipes = new HashMap<>();
- HashMap<K, OSGiResult> results = new HashMap<>();
+ public OSGi<T> effects(
+ Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
- return _operation.run(
+ return new OSGiImpl<>((bundleContext, op) ->
+ _operation.run(
bundleContext,
t -> {
- K key = mapper.apply(t);
+ onAdded.accept(t);
- results.computeIfAbsent(key, __ -> {
- ProbeImpl<T> probe = new ProbeImpl<>();
+ Runnable terminator;
+ try {
+ terminator = op.apply(t);
+ }
+ catch (Exception e) {
+ onRemoved.accept(t);
- OSGiImpl<S> program = (OSGiImpl<S>)fun.apply(probe);
+ throw e;
+ }
- OSGiResult r = program._operation.run(
- bundleContext, op);
+ return () -> {
+ onRemoved.accept(t);
- r.start();
+ terminator.run();
+ };
+ }));
+ }
- pipes.put(key, probe.getOperation());
+ @Override
+ public OSGi<T> filter(Predicate<T> predicate) {
+ return new OSGiImpl<>((bundleContext, op) ->
+ _operation.run(
+ bundleContext,
+ (t) -> {
+ if (predicate.test(t)) {
+ return op.apply(t);
+ }
+ else {
+ return () -> {};
+ }
+ }
+ ));
+ }
- return r;
- });
+ @Override
+ public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> fun) {
+ return new FlatMapImpl<>(this, fun);
+ }
- return pipes.get(key).apply(t);
- });
- });
+ @Override
+ public OSGi<Void> foreach(Consumer<? super T> onAdded) {
+ return foreach(onAdded, ign -> {});
+ }
+
+ @Override
+ public OSGi<Void> foreach(
+ Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+
+ return OSGi.ignore(effects(onAdded, onRemoved));
+ }
+
+ @Override
+ public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
+ return new OSGiImpl<>((bundleContext, op) ->
+ _operation.run(bundleContext, t -> op.apply(function.apply(t))));
}
@Override
@@ -182,10 +279,7 @@ public class OSGiImpl<T> implements OSGi
OSGi<T> errorProgram = onError.apply(t, e);
OSGiResult result =
- ((OSGiImpl<T>) errorProgram)._operation.run(
- bundleContext, op);
-
- result.start();
+ ((OSGiImpl<T>) errorProgram).run(bundleContext, op);
return result::close;
}
@@ -194,37 +288,40 @@ public class OSGiImpl<T> implements OSGi
}
@Override
- public OSGi<T> effects(
- Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+ public <K, S> OSGi<S> splitBy(
+ Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
- return new OSGiImpl<>((bundleContext, op) ->
- _operation.run(
- bundleContext,
- t -> {
- onAdded.accept(t);
+ return new OSGiImpl<>((bundleContext, op) -> {
+ HashMap<K, Pad<T, S>> pads = new HashMap<>();
- Runnable terminator;
- try {
- terminator = op.apply(t);
- }
- catch (Exception e) {
- onRemoved.accept(t);
+ OSGiResultImpl result = _operation.run(
+ bundleContext,
+ t ->
+ pads.computeIfAbsent(
+ mapper.apply(t),
+ __ -> new Pad<>(bundleContext, fun, op)
+ ).apply(t)
+ );
- throw e;
- }
+ return new OSGiResultImpl(
+ result::start,
+ () -> {
+ pads.values().forEach(Pad::close);
- return () -> {
- onRemoved.accept(t);
+ result.close();
+ });
+ });
+ }
- terminator.run();
- };
- }));
+ @Override
+ public <S> OSGi<S> then(OSGi<S> next) {
+ return flatMap(ignored -> next);
}
@Override
- public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
- return new OSGiImpl<>((bundleContext, op) ->
- _operation.run(bundleContext, t -> op.apply(function.apply(t))));
+ public <S> OSGi<S> transform(Transformer<T, S> fun) {
+
+ return new TransformerOSGi<>(this, fun);
}
@Override
@@ -234,23 +331,15 @@ public class OSGiImpl<T> implements OSGi
@Override
public OSGiResult run(BundleContext bundleContext, Consumer<T> andThen) {
- OSGiResultImpl osgiResult =
- _operation.run(
- bundleContext,
- t -> {
- andThen.accept(t);
+ return run(bundleContext, t -> {andThen.accept(t); return NOOP;});
+ }
- return () -> {};
- });
+ public OSGiResult run(BundleContext bundleContext, Publisher<T> op) {
+ OSGiResultImpl result = _operation.run(bundleContext, op);
- osgiResult.start();
+ result.start();
- return osgiResult;
- }
-
- @Override
- public <S> OSGi<S> then(OSGi<S> next) {
- return flatMap(ignored -> next);
+ return result;
}
static Filter buildFilter(
@@ -302,95 +391,6 @@ public class OSGiImpl<T> implements OSGi
return stringBuilder.toString();
}
- @Override
- public OSGi<T> filter(Predicate<T> predicate) {
- return new OSGiImpl<>((bundleContext, op) ->
- _operation.run(
- bundleContext,
- (t) -> {
- if (predicate.test(t)) {
- return op.apply(t);
- }
- else {
- return () -> {};
- }
- }
- ));
- }
-
- @Override
- public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
- return new OSGiImpl<>(
- (bundleContext, op) -> {
- AtomicReference<OSGiResult> myCloseReference =
- new AtomicReference<>();
-
- AtomicReference<OSGiResult> otherCloseReference =
- new AtomicReference<>();
-
- ConcurrentDoublyLinkedList<T> identities =
- new ConcurrentDoublyLinkedList<>();
-
- ConcurrentDoublyLinkedList<Function<T, S>> funs =
- new ConcurrentDoublyLinkedList<>();
-
- return new OSGiResultImpl(
- () -> {
- OSGiResultImpl or1 = _operation.run(
- bundleContext,
- t -> {
- Node node = identities.addLast(t);
-
- List<Runnable> terminators = funs.stream().map(
- f -> op.apply(f.apply(t))
- ).collect(
- Collectors.toList()
- );
-
- return () -> {
- node.remove();
-
- terminators.forEach(Runnable::run);
- };
- }
- );
-
- myCloseReference.set(or1);
-
- OSGiResultImpl funRun =
- ((OSGiImpl<Function<T, S>>) fun)._operation.run(
- bundleContext,
- f -> {
- Node node = funs.addLast(f);
-
- List<Runnable> terminators =
- identities.stream().map(
- t -> op.apply(f.apply(t))
- ).collect(
- Collectors.toList()
- );
-
- return () -> {
- node.remove();
-
- terminators.forEach(Runnable::run);
- };
- });
-
- otherCloseReference.set(funRun);
-
- or1.start();
-
- funRun.start();
- },
- () -> {
- myCloseReference.get().close();
-
- otherCloseReference.get().close();
- });
- });
- }
-
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java Fri Nov 17 16:00:41 2017
@@ -19,6 +19,7 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGiOperation;
import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
import org.osgi.framework.BundleContext;
import java.util.function.Function;
@@ -28,8 +29,7 @@ import java.util.function.Function;
*/
interface OSGiOperationImpl<T> extends OSGiOperation<T> {
- OSGiResultImpl run(
- BundleContext bundleContext, Function<T, Runnable> op);
+ OSGiResultImpl run(BundleContext bundleContext, Publisher<T> op);
@Override
default OSGiResult run(BundleContext bundleContext) {
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Fri Nov 17 16:00:41 2017
@@ -26,19 +26,25 @@ import java.util.concurrent.atomic.Atomi
*/
public class OSGiResultImpl implements OSGiResult {
- private final Runnable start;
- private final Runnable close;
- private AtomicBoolean _working = new AtomicBoolean();
- private AtomicBoolean _closed = new AtomicBoolean();
- private volatile boolean _started = false;
-
-
public OSGiResultImpl(Runnable start, Runnable close) {
this.start = start;
this.close = close;
}
@Override
+ public void close() {
+ while (!_working.compareAndSet(false, true)) {
+ Thread.yield();
+ }
+
+ if (_closed.compareAndSet(false, true) && _started) {
+ close.run();
+ }
+
+ _working.set(false);
+ }
+
+ @Override
public void start() {
if (_working.compareAndSet(false, true)) {
@@ -53,17 +59,10 @@ public class OSGiResultImpl implements O
}
- @Override
- public void close() {
- while (!_working.compareAndSet(false, true)) {
- Thread.yield();
- }
-
- if (_closed.compareAndSet(false, true) && _started) {
- close.run();
- }
-
- _working.set(false);
- }
+ private final Runnable start;
+ private final Runnable close;
+ private AtomicBoolean _working = new AtomicBoolean();
+ private AtomicBoolean _closed = new AtomicBoolean();
+ private volatile boolean _started = false;
}
Copied: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java (from r1815580, aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java)
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java&p1=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java&r1=1815580&r2=1815581&rev=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java Fri Nov 17 16:00:41 2017
@@ -17,23 +17,48 @@
package org.apache.aries.osgi.functional.internal;
-import org.apache.aries.osgi.functional.OSGiOperation;
+import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
import org.osgi.framework.BundleContext;
+import java.io.Closeable;
import java.util.function.Function;
+import static org.apache.aries.osgi.functional.OSGi.NOOP;
+
/**
* @author Carlos Sierra Andrés
*/
-interface OSGiOperationImpl<T> extends OSGiOperation<T> {
+public class Pad<T, S> implements Publisher<T>, Closeable {
+
+ public Pad(
+ BundleContext bundleContext,
+ Function<OSGi<T>, OSGi<S>> fun,
+ Publisher<S> continuation) {
+
+ ProbeImpl<T> probe = new ProbeImpl<>();
+
+ OSGiImpl<S> next = (OSGiImpl<S>) fun.apply(probe);
+
+ _result = next.run(bundleContext, continuation);
+
+ _publisher =
+ probe.getPublisher() != null ?
+ probe.getPublisher() :
+ __ -> NOOP;
+ }
- OSGiResultImpl run(
- BundleContext bundleContext, Function<T, Runnable> op);
+ @Override
+ public void close() {
+ _result.close();
+ }
- @Override
- default OSGiResult run(BundleContext bundleContext) {
- return run(bundleContext, (__) -> () -> {});
- }
+ @Override
+ public Runnable publish(T t) {
+ return _publisher.publish(t);
+ }
+ private final OSGiResult _result;
+ private final Publisher<T> _publisher;
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java Fri Nov 17 16:00:41 2017
@@ -17,12 +17,9 @@
package org.apache.aries.osgi.functional.internal;
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
import org.osgi.framework.BundleContext;
-import java.util.function.Function;
-
/**
* @author Carlos Sierra Andrés
*/
@@ -32,41 +29,22 @@ public class ProbeImpl<T> extends OSGiIm
super(new ProbeOperationImpl<>());
}
- public Function<T, Runnable> getOperation() {
+ public Publisher<T> getPublisher() {
return ((ProbeOperationImpl<T>) _operation)._op;
}
- public static <T, S> Function<T, Runnable> getProbePipe(
- Function<OSGi<T>, OSGi<S>> then, BundleContext bundleContext,
- Function<S, Runnable> publisher) {
-
- ProbeImpl<T> thenProbe = new ProbeImpl<>();
-
- OSGiImpl<S> thenNext = (OSGiImpl<S>) then.apply(thenProbe);
-
- OSGiResult thenResult = thenNext._operation.run(
- bundleContext, publisher);
-
- Function<T, Runnable> thenPipe = thenProbe.getOperation();
-
- thenResult.start();
-
- return thenPipe;
- }
-
private static class ProbeOperationImpl<T> implements OSGiOperationImpl<T> {
- BundleContext _bundleContext;
- Function<T, Runnable> _op;
-
@Override
public OSGiResultImpl run(
- BundleContext bundleContext, Function<T, Runnable> op) {
+ BundleContext bundleContext, Publisher<T> op) {
_bundleContext = bundleContext;
_op = op;
return new OSGiResultImpl(NOOP, NOOP);
}
+ BundleContext _bundleContext;
+ Publisher<T> _op;
}
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java Fri Nov 17 16:00:41 2017
@@ -56,10 +56,6 @@ public class ServiceReferenceOSGi<T>
private static class DefaultServiceTrackerCustomizer<T>
implements ServiceTrackerCustomizer<T, Tracked<T>> {
- private final Function<CachingServiceReference<T>, Runnable>
- _addedSource;
- private Refresher<? super CachingServiceReference<T>> _refresher;
-
public DefaultServiceTrackerCustomizer(
Function<CachingServiceReference<T>, Runnable> addedSource,
Refresher<? super CachingServiceReference<T>> refresher) {
@@ -97,13 +93,15 @@ public class ServiceReferenceOSGi<T>
tracked.runnable.run();
}
+
+ private final Function<CachingServiceReference<T>, Runnable>
+ _addedSource;
+ private Refresher<? super CachingServiceReference<T>> _refresher;
+
}
private static class Tracked<T> {
- volatile CachingServiceReference<T> cachingServiceReference;
- volatile Runnable runnable;
-
public Tracked(
CachingServiceReference<T> cachingServiceReference,
Runnable runnable) {
@@ -111,5 +109,9 @@ public class ServiceReferenceOSGi<T>
this.cachingServiceReference = cachingServiceReference;
this.runnable = runnable;
}
+
+ volatile CachingServiceReference<T> cachingServiceReference;
+ volatile Runnable runnable;
+
}
}
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java Fri Nov 17 16:00:41 2017
@@ -17,19 +17,17 @@
package org.apache.aries.osgi.functional.internal;
-import java.util.function.Function;
+import org.apache.aries.osgi.functional.Transformer;
/**
* @author Carlos Sierra Andrés
*/
public class TransformerOSGi<T, R> extends OSGiImpl<R> {
- public TransformerOSGi(
- OSGiImpl<T> previous,
- Function<Function<R, Runnable>, Function<T, Runnable>> fun) {
+ public TransformerOSGi(OSGiImpl<T> previous, Transformer<T, R> fun) {
super((bundleContext, op) ->
- previous._operation.run(bundleContext, fun.apply(op)));
+ previous._operation.run(bundleContext, fun.transform(op)));
}
}
Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java Fri Nov 17 16:00:41 2017
@@ -18,14 +18,12 @@
package org.apache.aries.osgi.functional.internal;
import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.SentEvent;
import org.apache.aries.osgi.functional.test.DSLTest;
import org.junit.Ignore;
import org.junit.Test;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -70,11 +68,11 @@ public class ProbeTests {
program.run(bundleContext, result::set);
- Function<String, Runnable> opA = probeA.getOperation();
+ Function<String, Runnable> opA = probeA.getPublisher();
Runnable sentA = opA.apply("Hello");
- Function<String, Runnable> opB = probeBreference.get().getOperation();
+ Function<String, Runnable> opB = probeBreference.get().getPublisher();
sentA.run();
@@ -85,10 +83,10 @@ public class ProbeTests {
program.run(bundleContext, result::set);
- opA = probeA.getOperation();
+ opA = probeA.getPublisher();
sentA = opA.apply("Hello");
- opB = probeBreference.get().getOperation();
+ opB = probeBreference.get().getPublisher();
sentB = opB.apply(", World");
assertEquals("Hello, World", result.get());
@@ -117,7 +115,7 @@ public class ProbeTests {
program.run(bundleContext, result::set);
assertEquals(0, result.get());
- Function<Integer, Runnable> opA = probeA.getOperation();
+ Function<Integer, Runnable> opA = probeA.getPublisher();
Runnable sentA = opA.apply(5);
assertEquals(15, result.get());
Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java Fri Nov 17 16:00:41 2017
@@ -19,7 +19,6 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.SentEvent;
import org.apache.aries.osgi.functional.internal.ProbeImpl;
import org.junit.Ignore;
import org.junit.Test;
@@ -337,9 +336,9 @@ public class AsynchronousTest {
result.start();
- Function<Integer, Runnable> opa = ((ProbeImpl<Integer>) as).getOperation();
- Function<Integer, Runnable> opb = ((ProbeImpl<Integer>) bs).getOperation();
- Function<Integer, Runnable> opc = ((ProbeImpl<Integer>) cs).getOperation();
+ Function<Integer, Runnable> opa = ((ProbeImpl<Integer>) as).getPublisher();
+ Function<Integer, Runnable> opb = ((ProbeImpl<Integer>) bs).getPublisher();
+ Function<Integer, Runnable> opc = ((ProbeImpl<Integer>) cs).getPublisher();
ExecutorService executor = Executors.newFixedThreadPool(8);
Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java Fri Nov 17 16:00:41 2017
@@ -23,6 +23,7 @@ import org.apache.aries.osgi.functional.
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java Fri Nov 17 16:00:41 2017
@@ -712,7 +712,7 @@ public class DSLTest {
once.run(bundleContext);
- Function<Integer, Runnable> op = probe.getOperation();
+ Function<Integer, Runnable> op = probe.getPublisher();
assertEquals(0, count.get());