You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/08/17 22:59:08 UTC
[pulsar] branch master updated: When the action has no 'continueOn',
continue to invoke subsequent actions. (#11679)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new efe5109b When the action has no 'continueOn', continue to invoke subsequent actions. (#11679)
efe5109b is described below
commit efe5109bc4afcf803d802cf195ef226cd31fb8ba
Author: Bharani Chadalavada <bh...@gmail.com>
AuthorDate: Tue Aug 17 15:58:12 2021 -0700
When the action has no 'continueOn', continue to invoke subsequent actions. (#11679)
Co-authored-by: Bharani Chadalavada <bc...@splunk.com>
---
.../org/apache/pulsar/functions/utils/Actions.java | 13 +-
.../apache/pulsar/functions/utils/ActionsTest.java | 201 +++++++++++++--------
2 files changed, 131 insertions(+), 83 deletions(-)
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
index a451c08..05a6356 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
@@ -94,12 +94,13 @@ public class Actions {
log.error("Uncaught exception thrown when running action [ {} ]:", action.getActionName(), e);
success = false;
}
- if (action.getContinueOn() != null
- && success == action.getContinueOn()) {
- continue;
- } else {
- // terminate
- break;
+ if (action.getContinueOn() != null) {
+ if (success == action.getContinueOn()) {
+ continue;
+ } else {
+ // terminate
+ break;
+ }
}
}
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
index f728f67..904b6e3 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
@@ -34,7 +34,7 @@ import static org.testng.Assert.assertEquals;
public class ActionsTest {
@Test
- public void testActions() throws InterruptedException {
+ public void testActionsSuccess() throws InterruptedException {
// Test for success
Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class);
@@ -47,27 +47,27 @@ public class ActionsTest {
java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
Actions.Action action1 = spy(
- Actions.Action.builder()
- .actionName("action1")
- .numRetries(10)
- .sleepBetweenInvocationsMs(100)
- .supplier(supplier1)
- .continueOn(true)
- .onFail(onFail)
- .onSuccess(onSucess)
- .build());
+ Actions.Action.builder()
+ .actionName("action1")
+ .numRetries(10)
+ .sleepBetweenInvocationsMs(100)
+ .supplier(supplier1)
+ .continueOn(true)
+ .onFail(onFail)
+ .onSuccess(onSucess)
+ .build());
Actions.Action action2 = spy(
- Actions.Action.builder()
- .actionName("action2")
- .numRetries(20)
- .sleepBetweenInvocationsMs(200)
- .supplier(supplier2)
- .build());
+ Actions.Action.builder()
+ .actionName("action2")
+ .numRetries(20)
+ .sleepBetweenInvocationsMs(200)
+ .supplier(supplier2)
+ .build());
Actions actions = Actions.newBuilder()
- .addAction(action1)
- .addAction(action2);
+ .addAction(action1)
+ .addAction(action2);
actions.run();
assertEquals(actions.numActions(), 2);
@@ -75,42 +75,44 @@ public class ActionsTest {
verify(onFail, times(0)).accept(any());
verify(onSucess, times(1)).accept(any());
verify(supplier2, times(1)).get();
+ }
- // test only run 1 action
- supplier1 = mock(Supplier.class);
+ @Test
+ public void testActionsOneAction() throws InterruptedException {
+ // test only run 1 action
+ Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class);
when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
- supplier2 = mock(Supplier.class);
+ Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class);
when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
- onFail = mock(java.util.function.Consumer.class);
- onSucess = mock(java.util.function.Consumer.class);
-
- action1 = spy(
- Actions.Action.builder()
- .actionName("action1")
- .numRetries(10)
- .sleepBetweenInvocationsMs(100)
- .supplier(supplier1)
- .continueOn(false)
- .onFail(onFail)
- .onSuccess(onSucess)
- .build());
-
- action2 = spy(
- Actions.Action.builder()
- .actionName("action2")
- .numRetries(20)
- .sleepBetweenInvocationsMs(200)
- .supplier(supplier2)
- .onFail(onFail)
- .onSuccess(onSucess)
- .build());
-
- actions = Actions.newBuilder()
- .addAction(action1)
- .addAction(action2);
+ java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class);
+ java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
+
+ Actions.Action action1 = spy(
+ Actions.Action.builder()
+ .actionName("action1")
+ .numRetries(10)
+ .sleepBetweenInvocationsMs(100)
+ .supplier(supplier1)
+ .continueOn(false)
+ .onFail(onFail)
+ .onSuccess(onSucess)
+ .build());
+ Actions.Action action2 = spy(
+ Actions.Action.builder()
+ .actionName("action2")
+ .numRetries(20)
+ .sleepBetweenInvocationsMs(200)
+ .supplier(supplier2)
+ .onFail(onFail)
+ .onSuccess(onSucess)
+ .build());
+
+ Actions actions = Actions.newBuilder()
+ .addAction(action1)
+ .addAction(action2);
actions.run();
assertEquals(actions.numActions(), 2);
@@ -118,40 +120,44 @@ public class ActionsTest {
verify(onFail, times(0)).accept(any());
verify(onSucess, times(1)).accept(any());
verify(supplier2, times(0)).get();
+ }
- // test retry
+ @Test
+ public void testActionsRetry() throws InterruptedException {
+
+ // test retry
- supplier1 = mock(Supplier.class);
+ Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class);
when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(false).build());
- supplier2 = mock(Supplier.class);
+ Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class);
when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
- onFail = mock(java.util.function.Consumer.class);
- onSucess = mock(java.util.function.Consumer.class);
-
- action1 = spy(
- Actions.Action.builder()
- .actionName("action1")
- .numRetries(10)
- .sleepBetweenInvocationsMs(10)
- .supplier(supplier1)
- .continueOn(false)
- .onFail(onFail)
- .onSuccess(onSucess)
- .build());
-
- action2 = spy(
- Actions.Action.builder()
- .actionName("action2")
- .numRetries(20)
- .sleepBetweenInvocationsMs(200)
- .supplier(supplier2)
- .build());
-
- actions = Actions.newBuilder()
- .addAction(action1)
- .addAction(action2);
+ java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class);
+ java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
+
+ Actions.Action action1 = spy(
+ Actions.Action.builder()
+ .actionName("action1")
+ .numRetries(10)
+ .sleepBetweenInvocationsMs(10)
+ .supplier(supplier1)
+ .continueOn(false)
+ .onFail(onFail)
+ .onSuccess(onSucess)
+ .build());
+
+ Actions.Action action2 = spy(
+ Actions.Action.builder()
+ .actionName("action2")
+ .numRetries(20)
+ .sleepBetweenInvocationsMs(200)
+ .supplier(supplier2)
+ .build());
+
+ Actions actions = Actions.newBuilder()
+ .addAction(action1)
+ .addAction(action2);
actions.run();
assertEquals(actions.numActions(), 2);
@@ -159,6 +165,47 @@ public class ActionsTest {
verify(onFail, times(1)).accept(any());
verify(onSucess, times(0)).accept(any());
verify(supplier2, times(1)).get();
+ }
+ @Test
+ public void testActionsNoContinueOn() throws InterruptedException {
+ // No continueOn
+ Supplier<Actions.ActionResult>supplier1 = mock(Supplier.class);
+ when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
+
+ Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class);
+ when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
+
+ java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class);
+ java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
+
+ Actions.Action action1 = spy(
+ Actions.Action.builder()
+ .actionName("action1")
+ .numRetries(10)
+ .sleepBetweenInvocationsMs(100)
+ .supplier(supplier1)
+ .onFail(onFail)
+ .onSuccess(onSucess)
+ .build());
+
+ Actions.Action action2 = spy(
+ Actions.Action.builder()
+ .actionName("action2")
+ .numRetries(20)
+ .sleepBetweenInvocationsMs(200)
+ .supplier(supplier2)
+ .build());
+
+ Actions actions = Actions.newBuilder()
+ .addAction(action1)
+ .addAction(action2);
+ actions.run();
+
+ assertEquals(actions.numActions(), 2);
+ verify(supplier1, times(1)).get();
+ verify(onFail, times(0)).accept(any());
+ verify(onSucess, times(1)).accept(any());
+ verify(supplier2, times(1)).get();
}
-}
+}
\ No newline at end of file