You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/11/17 15:59:48 UTC
svn commit: r1815577 - in /aries/trunk/component-dsl:
component-dsl/src/main/java/org/apache/aries/osgi/functional/
component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/
itests/src/main/java/org/apache/aries/osgi/functional/test/
Author: csierra
Date: Fri Nov 17 15:59:48 2017
New Revision: 1815577
URL: http://svn.apache.org/viewvc?rev=1815577&view=rev
Log:
[Component-DSL] Error handling
Adding some primitives that help recovering from errors
Modified:
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java Fri Nov 17 15:59:48 2017
@@ -47,6 +47,7 @@ import java.util.Dictionary;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -58,6 +59,10 @@ import java.util.function.Supplier;
public interface OSGi<T> extends OSGiRunnable<T> {
Runnable NOOP = () -> {};
+ OSGi<T> recover(BiFunction<T, Exception, T> onError);
+
+ OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
+
OSGi<T> effects(
Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
@@ -254,8 +259,6 @@ public interface OSGi<T> extends OSGiRun
return new ServiceReferenceOSGi<>(filterString, null, onModified);
}
-
-
@SafeVarargs
static <T> OSGi<T> all(OSGi<T> ... programs) {
return new DistributeOSGi<>(programs);
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java Fri Nov 17 15:59:48 2017
@@ -44,7 +44,12 @@ public class DistributeOSGi<T> extends O
bundleContext, op)).
collect(Collectors.toList()));
- results.forEach(OSGiResult::start);
+ results.forEach(osGiResult -> {
+ try {
+ osGiResult.start();
+ }
+ catch (Exception e) {}
+ });
},
() -> {
for (OSGiResult result : results) {
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Fri Nov 17 15:59:48 2017
@@ -26,6 +26,7 @@ import org.osgi.framework.InvalidSyntaxE
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -60,6 +61,46 @@ public class OSGiImpl<T> implements OSGi
}
@Override
+ public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
+ return new OSGiImpl<>((bundleContext, op) ->
+ _operation.run(
+ bundleContext,
+ t -> {
+ try {
+ return op.apply(t);
+ }
+ catch (Exception e) {
+ return op.apply(onError.apply(t, e));
+ }
+ }
+ ));
+ }
+
+ @Override
+ public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
+ return new OSGiImpl<>((bundleContext, op) ->
+ _operation.run(
+ bundleContext,
+ t -> {
+ try {
+ return op.apply(t);
+ }
+ catch (Exception e) {
+ OSGi<T> errorProgram = onError.apply(t, e);
+
+ OSGiResult result =
+ ((OSGiImpl<T>) errorProgram)._operation.run(
+ bundleContext, op);
+
+ result.start();
+
+ return result::close;
+ }
+ }
+ ));
+ }
+
+ @Override
public OSGi<T> effects(
Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
@@ -69,7 +110,15 @@ public class OSGiImpl<T> implements OSGi
t -> {
onAdded.accept(t);
- Runnable terminator = op.apply(t);
+ Runnable terminator;
+ try {
+ terminator = op.apply(t);
+ }
+ catch (Exception e) {
+ onRemoved.accept(t);
+
+ throw e;
+ }
return () -> {
onRemoved.accept(t);
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java Fri Nov 17 15:59:48 2017
@@ -19,10 +19,8 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.OSGiOperation;
import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.SentEvent;
import org.osgi.framework.BundleContext;
-import java.util.function.Consumer;
import java.util.function.Function;
/**
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Fri Nov 17 15:59:48 2017
@@ -43,14 +43,9 @@ public class OSGiResultImpl implements O
if (_working.compareAndSet(false, true)) {
if (!_started && !_closed.get()) {
- try {
- start.run();
+ start.run();
- _started = true;
- }
- catch (Exception e) {
- e.printStackTrace();
- }
+ _started = true;
}
_working.set(false);
@@ -65,12 +60,7 @@ public class OSGiResultImpl implements O
}
if (_closed.compareAndSet(false, true) && _started) {
- try {
- close.run();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
+ close.run();
}
_working.set(false);
Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java Fri Nov 17 15:59:48 2017
@@ -29,7 +29,16 @@ public class OnCloseOSGiImpl extends OSG
AtomicReference<Runnable> reference = new AtomicReference<>();
return new OSGiResultImpl(
- () -> reference.set(op.apply(null)),
+ () -> {
+ try {
+ reference.set(op.apply(null));
+ }
+ catch (Exception e) {
+ action.run();
+
+ throw e;
+ }
+ },
() -> {
action.run();
Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java Fri Nov 17 15:59:48 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
import org.apache.aries.osgi.functional.CachingServiceReference;
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.Refresher;
import org.apache.aries.osgi.functional.SentEvent;
import org.apache.aries.osgi.functional.internal.ProbeImpl;
import org.junit.Test;
@@ -50,6 +49,7 @@ import java.util.function.Function;
import static org.apache.aries.osgi.functional.OSGi.configuration;
import static org.apache.aries.osgi.functional.OSGi.configurations;
import static org.apache.aries.osgi.functional.OSGi.just;
+import static org.apache.aries.osgi.functional.OSGi.nothing;
import static org.apache.aries.osgi.functional.OSGi.onClose;
import static org.apache.aries.osgi.functional.OSGi.once;
import static org.apache.aries.osgi.functional.OSGi.register;
@@ -710,6 +710,128 @@ public class DSLTest {
}
}
+ @Test
+ public void testRecover() {
+ ArrayList<Object> result = new ArrayList<>();
+ ArrayList<Object> arrived = new ArrayList<>();
+ ArrayList<Object> left = new ArrayList<>();
+
+ OSGi<Integer> program = just(
+ Arrays.asList(1, 2, 3, 4, 5, 6)
+ ).recover(
+ (__, e) -> 0
+ ).effects(
+ arrived::add, left::add
+ ).
+ effects(
+ t -> {
+ if (t % 2 != 0) {
+ throw new RuntimeException();
+ }
+ }
+ , __ -> {}
+ );
+
+ try (OSGiResult run = program.run(bundleContext, result::add)) {
+ assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+ assertEquals(Arrays.asList(1, 0, 2, 3, 0, 4, 5, 0, 6), arrived);
+ assertEquals(Arrays.asList(1, 3, 5), left);
+
+ arrived.removeAll(left);
+ assertEquals(arrived, result);
+ }
+ }
+
+ @Test
+ public void testRecoverWith() {
+ ArrayList<Object> result = new ArrayList<>();
+ ArrayList<Object> arrived = new ArrayList<>();
+ ArrayList<Object> left = new ArrayList<>();
+
+ OSGi<Integer> program = just(
+ Arrays.asList(1, 2, 3, 4, 5, 6)
+ ).recoverWith(
+ (__, e) -> just(0)
+ ).effects(
+ arrived::add, left::add
+ ).effects(
+ t -> {
+ if (t % 2 != 0) {
+ throw new RuntimeException();
+ }
+ }
+ , __ -> {}
+ );
+
+ try (OSGiResult run = program.run(bundleContext, result::add)) {
+ assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+ assertEquals(Arrays.asList(1, 0, 2, 3, 0, 4, 5, 0, 6), arrived);
+ assertEquals(Arrays.asList(1, 3, 5), left);
+
+ arrived.removeAll(left);
+ assertEquals(arrived, result);
+ }
+ }
+
+ @Test
+ public void testOnCloseWithError() {
+ ArrayList<Object> result = new ArrayList<>();
+ ArrayList<Object> left = new ArrayList<>();
+
+ OSGi<Integer> program = just(
+ Arrays.asList(1, 2, 3, 4, 5, 6)
+ ).recoverWith(
+ (__, e) -> just(0)
+ ).flatMap(t ->
+ onClose(() -> left.add(t)).then(just(t))
+ ).
+ flatMap(t -> {
+ if (t % 2 != 0) {
+ throw new RuntimeException();
+ }
+
+ return just(t);
+ });
+
+ try (OSGiResult run = program.run(bundleContext, result::add)) {
+ assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+ assertEquals(Arrays.asList(1, 3, 5), left);
+ }
+ }
+
+ /*@Test
+ public void testRouteWithError() {
+ ArrayList<Object> result = new ArrayList<>();
+ ArrayList<Object> left = new ArrayList<>();
+
+ OSGi<Integer> program = just(
+ Arrays.asList(1, 2, 3, 4, 5, 6)
+ ).recoverWith(
+ (__, e) -> just(0)
+ ).route(router -> {
+ AtomicReference<SentEvent<Integer>> sentEvent =
+ new AtomicReference<>();
+
+ router.onIncoming(event -> {
+ sentEvent.set(router.signalAdd(event));
+ });
+ router.onLeaving(__ -> sentEvent.get().terminate());
+ }).
+ effects(__ -> {}, left::add).
+ flatMap(t -> {
+ if (t % 2 != 0) {
+ throw new RuntimeException();
+ }
+
+ return just(t);
+ });
+
+ try (OSGiResult run = program.run(bundleContext, result::add)) {
+ assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+ assertEquals(Arrays.asList(1, 3, 5), left);
+ }
+ }*/
+
private class Service {}
}