You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2017/11/17 15:59:48 UTC

svn commit: r1815577 - in /aries/trunk/component-dsl: component-dsl/src/main/java/org/apache/aries/osgi/functional/ component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ itests/src/main/java/org/apache/aries/osgi/functional/test/

Author: csierra
Date: Fri Nov 17 15:59:48 2017
New Revision: 1815577

URL: http://svn.apache.org/viewvc?rev=1815577&view=rev
Log:
[Component-DSL] Error handling

Adding some primitives that help recovering from errors

Modified:
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
    aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java Fri Nov 17 15:59:48 2017
@@ -47,6 +47,7 @@ import java.util.Dictionary;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -58,6 +59,10 @@ import java.util.function.Supplier;
 public interface OSGi<T> extends OSGiRunnable<T> {
 	Runnable NOOP = () -> {};
 
+	OSGi<T> recover(BiFunction<T, Exception, T> onError);
+
+	OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
+
 	OSGi<T> effects(
 		Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
 
@@ -254,8 +259,6 @@ public interface OSGi<T> extends OSGiRun
 		return new ServiceReferenceOSGi<>(filterString, null, onModified);
 	}
 
-
-
 	@SafeVarargs
 	static <T> OSGi<T> all(OSGi<T> ... programs) {
 		return new DistributeOSGi<>(programs);

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java Fri Nov 17 15:59:48 2017
@@ -44,7 +44,12 @@ public class DistributeOSGi<T> extends O
                                 bundleContext, op)).
                             collect(Collectors.toList()));
 
-                    results.forEach(OSGiResult::start);
+                    results.forEach(osGiResult -> {
+                        try {
+                            osGiResult.start();
+                        }
+                        catch (Exception e) {}
+                    });
                 },
                 () -> {
                     for (OSGiResult result : results) {

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java Fri Nov 17 15:59:48 2017
@@ -26,6 +26,7 @@ import org.osgi.framework.InvalidSyntaxE
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -60,6 +61,46 @@ public class OSGiImpl<T> implements OSGi
 	}
 
 	@Override
+	public OSGi<T> recover(BiFunction<T, Exception, T> onError) {
+		return new OSGiImpl<>((bundleContext, op) ->
+			_operation.run(
+				bundleContext,
+				t -> {
+					try {
+						return op.apply(t);
+					}
+					catch (Exception e) {
+						return op.apply(onError.apply(t, e));
+					}
+				}
+			));
+	}
+
+	@Override
+	public OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError) {
+		return new OSGiImpl<>((bundleContext, op) ->
+			_operation.run(
+				bundleContext,
+				t -> {
+					try {
+						return op.apply(t);
+					}
+					catch (Exception e) {
+						OSGi<T> errorProgram = onError.apply(t, e);
+
+						OSGiResult result =
+							((OSGiImpl<T>) errorProgram)._operation.run(
+								bundleContext, op);
+
+						result.start();
+
+						return result::close;
+					}
+				}
+			));
+	}
+
+	@Override
 	public OSGi<T> effects(
 		Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
@@ -69,7 +110,15 @@ public class OSGiImpl<T> implements OSGi
 				t -> {
 					onAdded.accept(t);
 
-					Runnable terminator = op.apply(t);
+					Runnable terminator;
+					try {
+						terminator = op.apply(t);
+					}
+					catch (Exception e) {
+						onRemoved.accept(t);
+
+						throw e;
+					}
 
 					return () -> {
 						onRemoved.accept(t);

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java Fri Nov 17 15:59:48 2017
@@ -19,10 +19,8 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGiOperation;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.osgi.framework.BundleContext;
 
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java Fri Nov 17 15:59:48 2017
@@ -43,14 +43,9 @@ public class OSGiResultImpl implements O
 		if (_working.compareAndSet(false, true)) {
 
 			if (!_started && !_closed.get()) {
-				try {
-					start.run();
+				start.run();
 
-					_started = true;
-				}
-				catch (Exception e) {
-					e.printStackTrace();
-				}
+				_started = true;
 			}
 
 			_working.set(false);
@@ -65,12 +60,7 @@ public class OSGiResultImpl implements O
 		}
 
 		if (_closed.compareAndSet(false, true) && _started) {
-			try {
-				close.run();
-			}
-			catch (Exception e) {
-				e.printStackTrace();
-			}
+			close.run();
 		}
 
 		_working.set(false);

Modified: aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java (original)
+++ aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java Fri Nov 17 15:59:48 2017
@@ -29,7 +29,16 @@ public class OnCloseOSGiImpl extends OSG
 			AtomicReference<Runnable> reference = new AtomicReference<>();
 
 			return new OSGiResultImpl(
-				() -> reference.set(op.apply(null)),
+				() -> {
+					try {
+						reference.set(op.apply(null));
+					}
+					catch (Exception e) {
+						action.run();
+
+						throw e;
+					}
+				},
 				() -> {
 					action.run();
 

Modified: aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815577&r1=1815576&r2=1815577&view=diff
==============================================================================
--- aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java (original)
+++ aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java Fri Nov 17 15:59:48 2017
@@ -20,7 +20,6 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.CachingServiceReference;
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.Refresher;
 import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.internal.ProbeImpl;
 import org.junit.Test;
@@ -50,6 +49,7 @@ import java.util.function.Function;
 import static org.apache.aries.osgi.functional.OSGi.configuration;
 import static org.apache.aries.osgi.functional.OSGi.configurations;
 import static org.apache.aries.osgi.functional.OSGi.just;
+import static org.apache.aries.osgi.functional.OSGi.nothing;
 import static org.apache.aries.osgi.functional.OSGi.onClose;
 import static org.apache.aries.osgi.functional.OSGi.once;
 import static org.apache.aries.osgi.functional.OSGi.register;
@@ -710,6 +710,128 @@ public class DSLTest {
         }
     }
 
+    @Test
+    public void testRecover() {
+        ArrayList<Object> result = new ArrayList<>();
+        ArrayList<Object> arrived = new ArrayList<>();
+        ArrayList<Object> left = new ArrayList<>();
+
+        OSGi<Integer> program = just(
+            Arrays.asList(1, 2, 3, 4, 5, 6)
+        ).recover(
+            (__, e) -> 0
+        ).effects(
+            arrived::add, left::add
+        ).
+        effects(
+            t -> {
+                if (t % 2 != 0) {
+                    throw new RuntimeException();
+                }
+            }
+            , __ -> {}
+        );
+
+        try (OSGiResult run = program.run(bundleContext, result::add)) {
+            assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+            assertEquals(Arrays.asList(1, 0, 2, 3, 0, 4, 5, 0, 6), arrived);
+            assertEquals(Arrays.asList(1, 3, 5), left);
+
+            arrived.removeAll(left);
+            assertEquals(arrived, result);
+        }
+    }
+
+    @Test
+    public void testRecoverWith() {
+        ArrayList<Object> result = new ArrayList<>();
+        ArrayList<Object> arrived = new ArrayList<>();
+        ArrayList<Object> left = new ArrayList<>();
+
+        OSGi<Integer> program = just(
+            Arrays.asList(1, 2, 3, 4, 5, 6)
+        ).recoverWith(
+            (__, e) -> just(0)
+        ).effects(
+            arrived::add, left::add
+        ).effects(
+            t -> {
+                if (t % 2 != 0) {
+                    throw new RuntimeException();
+                }
+            }
+            , __ -> {}
+        );
+
+        try (OSGiResult run = program.run(bundleContext, result::add)) {
+            assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+            assertEquals(Arrays.asList(1, 0, 2, 3, 0, 4, 5, 0, 6), arrived);
+            assertEquals(Arrays.asList(1, 3, 5), left);
+
+            arrived.removeAll(left);
+            assertEquals(arrived, result);
+        }
+    }
+
+    @Test
+    public void testOnCloseWithError() {
+        ArrayList<Object> result = new ArrayList<>();
+        ArrayList<Object> left = new ArrayList<>();
+
+        OSGi<Integer> program = just(
+            Arrays.asList(1, 2, 3, 4, 5, 6)
+        ).recoverWith(
+            (__, e) -> just(0)
+        ).flatMap(t ->
+            onClose(() -> left.add(t)).then(just(t))
+        ).
+        flatMap(t -> {
+            if (t % 2 != 0) {
+                throw new RuntimeException();
+            }
+
+            return just(t);
+        });
+
+        try (OSGiResult run = program.run(bundleContext, result::add)) {
+            assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+            assertEquals(Arrays.asList(1, 3, 5), left);
+        }
+    }
+
+    /*@Test
+    public void testRouteWithError() {
+        ArrayList<Object> result = new ArrayList<>();
+        ArrayList<Object> left = new ArrayList<>();
+
+        OSGi<Integer> program = just(
+            Arrays.asList(1, 2, 3, 4, 5, 6)
+        ).recoverWith(
+            (__, e) -> just(0)
+        ).route(router -> {
+            AtomicReference<SentEvent<Integer>> sentEvent =
+                new AtomicReference<>();
+
+            router.onIncoming(event -> {
+                sentEvent.set(router.signalAdd(event));
+            });
+            router.onLeaving(__ -> sentEvent.get().terminate());
+        }).
+            effects(__ -> {}, left::add).
+            flatMap(t -> {
+                if (t % 2 != 0) {
+                    throw new RuntimeException();
+                }
+
+                return just(t);
+            });
+
+        try (OSGiResult run = program.run(bundleContext, result::add)) {
+            assertEquals(Arrays.asList(0, 2, 0, 4, 0, 6), result);
+            assertEquals(Arrays.asList(1, 3, 5), left);
+        }
+    }*/
+
     private class Service {}
 
 }