You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/08/17 22:59:08 UTC

[pulsar] branch master updated: When the action has no 'continueOn', continue to invoke subsequent actions. (#11679)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new efe5109b When the action has no 'continueOn', continue to invoke subsequent actions. (#11679)
efe5109b is described below

commit efe5109bc4afcf803d802cf195ef226cd31fb8ba
Author: Bharani Chadalavada <bh...@gmail.com>
AuthorDate: Tue Aug 17 15:58:12 2021 -0700

    When the action has no 'continueOn', continue to invoke subsequent actions. (#11679)
    
    Co-authored-by: Bharani Chadalavada <bc...@splunk.com>
---
 .../org/apache/pulsar/functions/utils/Actions.java |  13 +-
 .../apache/pulsar/functions/utils/ActionsTest.java | 201 +++++++++++++--------
 2 files changed, 131 insertions(+), 83 deletions(-)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
index a451c08..05a6356 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
@@ -94,12 +94,13 @@ public class Actions {
                 log.error("Uncaught exception thrown when running action [ {} ]:", action.getActionName(), e);
                 success = false;
             }
-            if (action.getContinueOn() != null
-                    && success == action.getContinueOn()) {
-                continue;
-            } else {
-                // terminate
-                break;
+            if (action.getContinueOn() != null) {
+                if (success == action.getContinueOn()) {
+                    continue;
+                } else {
+                    // terminate
+                    break;
+                }
             }
         }
     }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
index f728f67..904b6e3 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
@@ -34,7 +34,7 @@ import static org.testng.Assert.assertEquals;
 public class ActionsTest {
 
     @Test
-    public void testActions() throws InterruptedException {
+    public void testActionsSuccess() throws InterruptedException {
 
         // Test for success
         Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class);
@@ -47,27 +47,27 @@ public class ActionsTest {
         java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
 
         Actions.Action action1 = spy(
-                Actions.Action.builder()
-                        .actionName("action1")
-                        .numRetries(10)
-                        .sleepBetweenInvocationsMs(100)
-                        .supplier(supplier1)
-                        .continueOn(true)
-                        .onFail(onFail)
-                        .onSuccess(onSucess)
-                        .build());
+            Actions.Action.builder()
+                .actionName("action1")
+                .numRetries(10)
+                .sleepBetweenInvocationsMs(100)
+                .supplier(supplier1)
+                .continueOn(true)
+                .onFail(onFail)
+                .onSuccess(onSucess)
+                .build());
 
         Actions.Action action2 = spy(
-                Actions.Action.builder()
-                        .actionName("action2")
-                        .numRetries(20)
-                        .sleepBetweenInvocationsMs(200)
-                        .supplier(supplier2)
-                        .build());
+            Actions.Action.builder()
+                .actionName("action2")
+                .numRetries(20)
+                .sleepBetweenInvocationsMs(200)
+                .supplier(supplier2)
+                .build());
 
         Actions actions = Actions.newBuilder()
-                .addAction(action1)
-                .addAction(action2);
+            .addAction(action1)
+            .addAction(action2);
         actions.run();
 
         assertEquals(actions.numActions(), 2);
@@ -75,42 +75,44 @@ public class ActionsTest {
         verify(onFail, times(0)).accept(any());
         verify(onSucess, times(1)).accept(any());
         verify(supplier2, times(1)).get();
+    }
 
-        // test only run 1 action
 
-        supplier1 = mock(Supplier.class);
+    @Test
+    public void testActionsOneAction() throws InterruptedException {
+        // test only run 1 action
+        Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class);
         when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
 
-        supplier2 = mock(Supplier.class);
+        Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class);
         when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
 
-        onFail = mock(java.util.function.Consumer.class);
-        onSucess = mock(java.util.function.Consumer.class);
-
-        action1 = spy(
-                Actions.Action.builder()
-                        .actionName("action1")
-                        .numRetries(10)
-                        .sleepBetweenInvocationsMs(100)
-                        .supplier(supplier1)
-                        .continueOn(false)
-                        .onFail(onFail)
-                        .onSuccess(onSucess)
-                        .build());
-
-        action2 = spy(
-                Actions.Action.builder()
-                        .actionName("action2")
-                        .numRetries(20)
-                        .sleepBetweenInvocationsMs(200)
-                        .supplier(supplier2)
-                        .onFail(onFail)
-                        .onSuccess(onSucess)
-                        .build());
-
-        actions = Actions.newBuilder()
-                .addAction(action1)
-                .addAction(action2);
+        java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class);
+        java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
+
+        Actions.Action action1 = spy(
+            Actions.Action.builder()
+                .actionName("action1")
+                .numRetries(10)
+                .sleepBetweenInvocationsMs(100)
+                .supplier(supplier1)
+                .continueOn(false)
+                .onFail(onFail)
+                .onSuccess(onSucess)
+                .build());
+        Actions.Action action2 = spy(
+            Actions.Action.builder()
+                .actionName("action2")
+                .numRetries(20)
+                .sleepBetweenInvocationsMs(200)
+                .supplier(supplier2)
+                .onFail(onFail)
+                .onSuccess(onSucess)
+                .build());
+
+        Actions actions = Actions.newBuilder()
+            .addAction(action1)
+            .addAction(action2);
         actions.run();
 
         assertEquals(actions.numActions(), 2);
@@ -118,40 +120,44 @@ public class ActionsTest {
         verify(onFail, times(0)).accept(any());
         verify(onSucess, times(1)).accept(any());
         verify(supplier2, times(0)).get();
+    }
 
-        // test retry
+    @Test
+    public void testActionsRetry() throws InterruptedException {
+
+      // test retry
 
-        supplier1 = mock(Supplier.class);
+        Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class);
         when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(false).build());
 
-        supplier2 = mock(Supplier.class);
+        Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class);
         when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
 
-        onFail = mock(java.util.function.Consumer.class);
-        onSucess = mock(java.util.function.Consumer.class);
-
-        action1 = spy(
-                Actions.Action.builder()
-                        .actionName("action1")
-                        .numRetries(10)
-                        .sleepBetweenInvocationsMs(10)
-                        .supplier(supplier1)
-                        .continueOn(false)
-                        .onFail(onFail)
-                        .onSuccess(onSucess)
-                        .build());
-
-        action2 = spy(
-                Actions.Action.builder()
-                        .actionName("action2")
-                        .numRetries(20)
-                        .sleepBetweenInvocationsMs(200)
-                        .supplier(supplier2)
-                        .build());
-
-        actions = Actions.newBuilder()
-                .addAction(action1)
-                .addAction(action2);
+        java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class);
+        java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
+
+        Actions.Action action1 = spy(
+            Actions.Action.builder()
+                .actionName("action1")
+                .numRetries(10)
+                .sleepBetweenInvocationsMs(10)
+                .supplier(supplier1)
+                .continueOn(false)
+                .onFail(onFail)
+                .onSuccess(onSucess)
+                .build());
+
+        Actions.Action action2 = spy(
+            Actions.Action.builder()
+                .actionName("action2")
+                .numRetries(20)
+                .sleepBetweenInvocationsMs(200)
+                .supplier(supplier2)
+                .build());
+
+        Actions actions = Actions.newBuilder()
+            .addAction(action1)
+            .addAction(action2);
         actions.run();
 
         assertEquals(actions.numActions(), 2);
@@ -159,6 +165,47 @@ public class ActionsTest {
         verify(onFail, times(1)).accept(any());
         verify(onSucess, times(0)).accept(any());
         verify(supplier2, times(1)).get();
+    }
 
+  @Test
+  public void testActionsNoContinueOn() throws InterruptedException {
+      // No continueOn
+      Supplier<Actions.ActionResult>supplier1 = mock(Supplier.class);
+      when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
+
+      Supplier<Actions.ActionResult>    supplier2 = mock(Supplier.class);
+      when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
+
+      java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class);
+      java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
+
+      Actions.Action action1 = spy(
+          Actions.Action.builder()
+              .actionName("action1")
+              .numRetries(10)
+              .sleepBetweenInvocationsMs(100)
+              .supplier(supplier1)
+              .onFail(onFail)
+              .onSuccess(onSucess)
+              .build());
+
+      Actions.Action action2 = spy(
+          Actions.Action.builder()
+              .actionName("action2")
+              .numRetries(20)
+              .sleepBetweenInvocationsMs(200)
+              .supplier(supplier2)
+              .build());
+
+      Actions actions = Actions.newBuilder()
+          .addAction(action1)
+          .addAction(action2);
+      actions.run();
+
+      assertEquals(actions.numActions(), 2);
+      verify(supplier1, times(1)).get();
+      verify(onFail, times(0)).accept(any());
+      verify(onSucess, times(1)).accept(any());
+      verify(supplier2, times(1)).get();
     }
-}
+}
\ No newline at end of file