You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/07 07:31:46 UTC
[pulsar] branch master updated: Retry creation of assignment topic
a few times before giving up (#3722)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fd05219 Retry creation of assignment topic a few times before giving up (#3722)
fd05219 is described below
commit fd05219a3593fdb69e2efd71bed3e5895be91ea8
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Mar 6 23:31:40 2019 -0800
Retry creation of assignment topic a few times before giving up (#3722)
* Retry creation of assignment topic a few times before giving up
* Use Action based retry mechanism
* Fix build
* Catch interrupted exception
* Fix unittest
* Added header
---
.../functions/runtime/KubernetesRuntime.java | 87 ++++++-------
.../pulsar/functions/runtime/RuntimeUtils.java | 105 ----------------
.../org/apache/pulsar/functions/utils/Actions.java | 139 +++++++++++++++++++++
.../pulsar/functions/utils/ActionsTest.java} | 70 ++++++-----
.../pulsar/functions/worker/FunctionActioner.java | 14 +--
.../pulsar/functions/worker/SchedulerManager.java | 71 ++++++-----
6 files changed, 264 insertions(+), 222 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 3fc0c69..3c0468d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -60,6 +60,7 @@ import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Utils;
@@ -360,7 +361,7 @@ public class KubernetesRuntime implements Runtime {
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
- RuntimeUtils.Actions.Action createService = RuntimeUtils.Actions.Action.builder()
+ Actions.Action createService = Actions.Action.builder()
.actionName(String.format("Submitting service for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -372,25 +373,25 @@ public class KubernetesRuntime implements Runtime {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
log.warn("Service already present for function {}", fqfn);
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
})
.build();
AtomicBoolean success = new AtomicBoolean(false);
- RuntimeUtils.Actions.newBuilder()
+ Actions.newBuilder()
.addAction(createService.toBuilder()
- .onSuccess(() -> success.set(true))
+ .onSuccess((ignored) -> success.set(true))
.build())
.run();
@@ -432,7 +433,7 @@ public class KubernetesRuntime implements Runtime {
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
- RuntimeUtils.Actions.Action createStatefulSet = RuntimeUtils.Actions.Action.builder()
+ Actions.Action createStatefulSet = Actions.Action.builder()
.actionName(String.format("Submitting statefulset for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -444,25 +445,25 @@ public class KubernetesRuntime implements Runtime {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
log.warn("Statefulset already present for function {}", fqfn);
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
})
.build();
AtomicBoolean success = new AtomicBoolean(false);
- RuntimeUtils.Actions.newBuilder()
+ Actions.newBuilder()
.addAction(createStatefulSet.toBuilder()
- .onSuccess(() -> success.set(true))
+ .onSuccess((ignored) -> success.set(true))
.build())
.run();
@@ -479,7 +480,7 @@ public class KubernetesRuntime implements Runtime {
options.setPropagationPolicy("Foreground");
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
- RuntimeUtils.Actions.Action deleteStatefulSet = RuntimeUtils.Actions.Action.builder()
+ Actions.Action deleteStatefulSet = Actions.Action.builder()
.actionName(String.format("Deleting statefulset for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -498,16 +499,16 @@ public class KubernetesRuntime implements Runtime {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
log.warn("Statefulset for function {} does not exist", fqfn);
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
} catch (IOException e) {
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(e.getMessage())
.build();
@@ -516,9 +517,9 @@ public class KubernetesRuntime implements Runtime {
// if already deleted
if (response.code() == HTTP_NOT_FOUND) {
log.warn("Statefulset for function {} does not exist", fqfn);
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
} else {
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(response.isSuccessful())
.errorMsg(response.message())
.build();
@@ -527,7 +528,7 @@ public class KubernetesRuntime implements Runtime {
.build();
- RuntimeUtils.Actions.Action waitForStatefulSetDeletion = RuntimeUtils.Actions.Action.builder()
+ Actions.Action waitForStatefulSetDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
// set retry period to be about 2x the graceshutdown time
.numRetries(NUM_RETRIES * 2)
@@ -540,16 +541,16 @@ public class KubernetesRuntime implements Runtime {
} catch (ApiException e) {
// statefulset is gone
if (e.getCode() == HTTP_NOT_FOUND) {
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(response.getStatus().toString())
.build();
@@ -557,7 +558,7 @@ public class KubernetesRuntime implements Runtime {
.build();
// Need to wait for all pods to die so we can cleanup subscriptions.
- RuntimeUtils.Actions.Action waitForStatefulPodsToTerminate = RuntimeUtils.Actions.Action.builder()
+ Actions.Action waitForStatefulPodsToTerminate = Actions.Action.builder()
.actionName(String.format("Waiting for pods for function %s to terminate", fqfn))
.numRetries(NUM_RETRIES * 2)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS * 2)
@@ -575,19 +576,19 @@ public class KubernetesRuntime implements Runtime {
} catch (ApiException e) {
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
if (response.getItems().size() > 0) {
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(response.getItems().size() + " pods still alive.")
.build();
} else {
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(true)
.build();
}
@@ -596,19 +597,19 @@ public class KubernetesRuntime implements Runtime {
AtomicBoolean success = new AtomicBoolean(false);
- RuntimeUtils.Actions.newBuilder()
+ Actions.newBuilder()
.addAction(deleteStatefulSet.toBuilder()
.continueOn(true)
.build())
.addAction(waitForStatefulSetDeletion.toBuilder()
.continueOn(false)
- .onSuccess(() -> success.set(true))
+ .onSuccess((ignored) -> success.set(true))
.build())
.addAction(deleteStatefulSet.toBuilder()
.continueOn(true)
.build())
.addAction(waitForStatefulSetDeletion.toBuilder()
- .onSuccess(() -> success.set(true))
+ .onSuccess((ignored) -> success.set(true))
.build())
.run();
@@ -616,7 +617,7 @@ public class KubernetesRuntime implements Runtime {
throw new RuntimeException(String.format("Failed to delete statefulset for function %s", fqfn));
} else {
// wait for pods to terminate
- RuntimeUtils.Actions.newBuilder()
+ Actions.newBuilder()
.addAction(waitForStatefulPodsToTerminate)
.run();
}
@@ -630,7 +631,7 @@ public class KubernetesRuntime implements Runtime {
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
String serviceName = createJobName(instanceConfig.getFunctionDetails());
- RuntimeUtils.Actions.Action deleteService = RuntimeUtils.Actions.Action.builder()
+ Actions.Action deleteService = Actions.Action.builder()
.actionName(String.format("Deleting service for function %s", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -648,16 +649,16 @@ public class KubernetesRuntime implements Runtime {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
log.warn("Service for function {} does not exist", fqfn);
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
} catch (IOException e) {
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(e.getMessage())
.build();
@@ -666,9 +667,9 @@ public class KubernetesRuntime implements Runtime {
// if already deleted
if (response.code() == HTTP_NOT_FOUND) {
log.warn("Service for function {} does not exist", fqfn);
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
} else {
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(response.isSuccessful())
.errorMsg(response.message())
.build();
@@ -676,7 +677,7 @@ public class KubernetesRuntime implements Runtime {
})
.build();
- RuntimeUtils.Actions.Action waitForServiceDeletion = RuntimeUtils.Actions.Action.builder()
+ Actions.Action waitForServiceDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
.numRetries(NUM_RETRIES)
.sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
@@ -689,15 +690,15 @@ public class KubernetesRuntime implements Runtime {
} catch (ApiException e) {
// statefulset is gone
if (e.getCode() == HTTP_NOT_FOUND) {
- return RuntimeUtils.Actions.ActionResult.builder().success(true).build();
+ return Actions.ActionResult.builder().success(true).build();
}
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
}
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(response.getStatus().toString())
.build();
@@ -705,19 +706,19 @@ public class KubernetesRuntime implements Runtime {
.build();
AtomicBoolean success = new AtomicBoolean(false);
- RuntimeUtils.Actions.newBuilder()
+ Actions.newBuilder()
.addAction(deleteService.toBuilder()
.continueOn(true)
.build())
.addAction(waitForServiceDeletion.toBuilder()
.continueOn(false)
- .onSuccess(() -> success.set(true))
+ .onSuccess((ignored) -> success.set(true))
.build())
.addAction(deleteService.toBuilder()
.continueOn(true)
.build())
.addAction(waitForServiceDeletion.toBuilder()
- .onSuccess(() -> success.set(true))
+ .onSuccess((ignored) -> success.set(true))
.build())
.run();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 9862b0a..95f10ae 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -258,109 +258,4 @@ public class RuntimeUtils {
return result.toString();
}
- public static class Actions {
- private List<Action> actions = new LinkedList<>();
-
- @Data
- @Builder(toBuilder=true)
- public static class Action {
- private String actionName;
- private int numRetries = 1;
- private Supplier<ActionResult> supplier;
- private long sleepBetweenInvocationsMs = 500;
- private Boolean continueOn;
- private Runnable onFail;
- private Runnable onSuccess;
-
- public void verifyAction() {
- if (isBlank(actionName)) {
- throw new RuntimeException("Action name is empty!");
- }
- if (supplier == null) {
- throw new RuntimeException("Supplier is not specified!");
- }
- }
- }
-
- @Data
- @Builder
- public static class ActionResult {
- private boolean success;
- private String errorMsg;
- }
-
- private Actions() {
-
- }
-
-
- public Actions addAction(Action action) {
- action.verifyAction();
- this.actions.add(action);
- return this;
- }
-
- public static Actions newBuilder() {
- return new Actions();
- }
-
- public int numActions() {
- return actions.size();
- }
-
- public void run() throws InterruptedException {
- Iterator<Action> it = this.actions.iterator();
- while(it.hasNext()) {
- Action action = it.next();
-
- boolean success;
- try {
- success = runAction(action);
- } catch (Exception e) {
- log.error("Uncaught exception thrown when running action [ {} ]:", action.getActionName(), e);
- success = false;
- }
- if (action.getContinueOn() != null
- && success == action.getContinueOn()) {
- continue;
- } else {
- // terminate
- break;
- }
- }
- }
-
- private boolean runAction(Action action) throws InterruptedException {
- for (int i = 0; i< action.getNumRetries(); i++) {
-
- ActionResult actionResult = action.getSupplier().get();
-
- if (actionResult.isSuccess()) {
- log.info("Sucessfully completed action [ {} ]", action.getActionName());
- if (action.getOnSuccess() != null) {
- action.getOnSuccess().run();
- }
- return true;
- } else {
- if (actionResult.getErrorMsg() != null) {
- log.warn("Error completing action [ {} ] :- {} - [ATTEMPT] {}/{}",
- action.getActionName(),
- actionResult.getErrorMsg(),
- i + 1, action.getNumRetries());
- } else {
- log.warn("Error completing action [ {} ] [ATTEMPT] {}/{}",
- action.getActionName(),
- i + 1, action.getNumRetries());
- }
-
- Thread.sleep(action.sleepBetweenInvocationsMs);
- }
- }
- log.error("Failed completing action [ {} ]. Giving up!", action.getActionName());
- if (action.getOnFail() != null) {
- action.getOnFail().run();
- }
- return false;
- }
- }
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
new file mode 100644
index 0000000..a451c08
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Actions.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+@Slf4j
+public class Actions {
+ private List<Action> actions = new LinkedList<>();
+
+ @Data
+ @Builder(toBuilder=true)
+ public static class Action {
+ private String actionName;
+ private int numRetries = 1;
+ private Supplier<ActionResult> supplier;
+ private long sleepBetweenInvocationsMs = 500;
+ private Boolean continueOn;
+ private Consumer<ActionResult> onFail;
+ private Consumer<ActionResult> onSuccess;
+
+ public void verifyAction() {
+ if (isBlank(actionName)) {
+ throw new RuntimeException("Action name is empty!");
+ }
+ if (supplier == null) {
+ throw new RuntimeException("Supplier is not specified!");
+ }
+ }
+ }
+
+ @Data
+ @Builder
+ public static class ActionResult {
+ private boolean success;
+ private String errorMsg;
+ private Object result;
+ }
+
+ private Actions() {
+
+ }
+
+
+ public Actions addAction(Action action) {
+ action.verifyAction();
+ this.actions.add(action);
+ return this;
+ }
+
+ public static Actions newBuilder() {
+ return new Actions();
+ }
+
+ public int numActions() {
+ return actions.size();
+ }
+
+ public void run() throws InterruptedException {
+ Iterator<Action> it = this.actions.iterator();
+ while(it.hasNext()) {
+ Action action = it.next();
+
+ boolean success;
+ try {
+ success = runAction(action);
+ } catch (Exception e) {
+ log.error("Uncaught exception thrown when running action [ {} ]:", action.getActionName(), e);
+ success = false;
+ }
+ if (action.getContinueOn() != null
+ && success == action.getContinueOn()) {
+ continue;
+ } else {
+ // terminate
+ break;
+ }
+ }
+ }
+
+ private boolean runAction(Action action) throws InterruptedException {
+ for (int i = 0; i< action.getNumRetries(); i++) {
+
+ ActionResult actionResult = action.getSupplier().get();
+
+ if (actionResult.isSuccess()) {
+ log.info("Sucessfully completed action [ {} ]", action.getActionName());
+ if (action.getOnSuccess() != null) {
+ action.getOnSuccess().accept(actionResult);
+ }
+ return true;
+ } else {
+ if (actionResult.getErrorMsg() != null) {
+ log.warn("Error completing action [ {} ] :- {} - [ATTEMPT] {}/{}",
+ action.getActionName(),
+ actionResult.getErrorMsg(),
+ i + 1, action.getNumRetries());
+ } else {
+ log.warn("Error completing action [ {} ] [ATTEMPT] {}/{}",
+ action.getActionName(),
+ i + 1, action.getNumRetries());
+ }
+
+ Thread.sleep(action.sleepBetweenInvocationsMs);
+ }
+ }
+ log.error("Failed completing action [ {} ]. Giving up!", action.getActionName());
+ if (action.getOnFail() != null) {
+ action.getOnFail().accept(action.getSupplier().get());
+ }
+ return false;
+ }
+}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
similarity index 65%
rename from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
rename to pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
index 5d78599..309e466 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ActionsTest.java
@@ -16,12 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.utils;
+import org.apache.pulsar.functions.utils.Actions;
import org.testng.annotations.Test;
import java.util.function.Supplier;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -29,23 +31,23 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-public class RuntimeUtilsTest {
+public class ActionsTest {
@Test
public void testActions() throws InterruptedException {
// Test for success
- Supplier<RuntimeUtils.Actions.ActionResult> supplier1 = mock(Supplier.class);
- when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+ Supplier<Actions.ActionResult> supplier1 = mock(Supplier.class);
+ when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
- Supplier<RuntimeUtils.Actions.ActionResult> supplier2 = mock(Supplier.class);
- when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+ Supplier<Actions.ActionResult> supplier2 = mock(Supplier.class);
+ when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
- Runnable onFail = mock(Runnable.class);
- Runnable onSucess = mock(Runnable.class);
+ java.util.function.Consumer<Actions.ActionResult> onFail = mock(java.util.function.Consumer.class);
+ java.util.function.Consumer<Actions.ActionResult> onSucess = mock(java.util.function.Consumer.class);
- RuntimeUtils.Actions.Action action1 = spy(
- RuntimeUtils.Actions.Action.builder()
+ Actions.Action action1 = spy(
+ Actions.Action.builder()
.actionName("action1")
.numRetries(10)
.sleepBetweenInvocationsMs(100)
@@ -55,38 +57,38 @@ public class RuntimeUtilsTest {
.onSuccess(onSucess)
.build());
- RuntimeUtils.Actions.Action action2 = spy(
- RuntimeUtils.Actions.Action.builder()
+ Actions.Action action2 = spy(
+ Actions.Action.builder()
.actionName("action2")
.numRetries(20)
.sleepBetweenInvocationsMs(200)
.supplier(supplier2)
.build());
- RuntimeUtils.Actions actions = RuntimeUtils.Actions.newBuilder()
+ Actions actions = Actions.newBuilder()
.addAction(action1)
.addAction(action2);
actions.run();
assertEquals(actions.numActions(), 2);
verify(supplier1, times(1)).get();
- verify(onFail, times(0)).run();
- verify(onSucess, times(1)).run();
+ verify(onFail, times(0)).accept(any());
+ verify(onSucess, times(1)).accept(any());
verify(supplier2, times(1)).get();
// test only run 1 action
supplier1 = mock(Supplier.class);
- when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+ when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
supplier2 = mock(Supplier.class);
- when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+ when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
- onFail = mock(Runnable.class);
- onSucess = mock(Runnable.class);
+ onFail = mock(java.util.function.Consumer.class);
+ onSucess = mock(java.util.function.Consumer.class);
action1 = spy(
- RuntimeUtils.Actions.Action.builder()
+ Actions.Action.builder()
.actionName("action1")
.numRetries(10)
.sleepBetweenInvocationsMs(100)
@@ -97,7 +99,7 @@ public class RuntimeUtilsTest {
.build());
action2 = spy(
- RuntimeUtils.Actions.Action.builder()
+ Actions.Action.builder()
.actionName("action2")
.numRetries(20)
.sleepBetweenInvocationsMs(200)
@@ -106,30 +108,30 @@ public class RuntimeUtilsTest {
.onSuccess(onSucess)
.build());
- actions = RuntimeUtils.Actions.newBuilder()
+ actions = Actions.newBuilder()
.addAction(action1)
.addAction(action2);
actions.run();
assertEquals(actions.numActions(), 2);
verify(supplier1, times(1)).get();
- verify(onFail, times(0)).run();
- verify(onSucess, times(1)).run();
+ verify(onFail, times(0)).accept(any());
+ verify(onSucess, times(1)).accept(any());
verify(supplier2, times(0)).get();
// test retry
supplier1 = mock(Supplier.class);
- when(supplier1.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(false).build());
+ when(supplier1.get()).thenReturn(Actions.ActionResult.builder().success(false).build());
supplier2 = mock(Supplier.class);
- when(supplier2.get()).thenReturn(RuntimeUtils.Actions.ActionResult.builder().success(true).build());
+ when(supplier2.get()).thenReturn(Actions.ActionResult.builder().success(true).build());
- onFail = mock(Runnable.class);
- onSucess = mock(Runnable.class);
+ onFail = mock(java.util.function.Consumer.class);
+ onSucess = mock(java.util.function.Consumer.class);
action1 = spy(
- RuntimeUtils.Actions.Action.builder()
+ Actions.Action.builder()
.actionName("action1")
.numRetries(10)
.sleepBetweenInvocationsMs(10)
@@ -140,22 +142,22 @@ public class RuntimeUtilsTest {
.build());
action2 = spy(
- RuntimeUtils.Actions.Action.builder()
+ Actions.Action.builder()
.actionName("action2")
.numRetries(20)
.sleepBetweenInvocationsMs(200)
.supplier(supplier2)
.build());
- actions = RuntimeUtils.Actions.newBuilder()
+ actions = Actions.newBuilder()
.addAction(action1)
.addAction(action2);
actions.run();
assertEquals(actions.numActions(), 2);
- verify(supplier1, times(10)).get();
- verify(onFail, times(1)).run();
- verify(onSucess, times(0)).run();
+ verify(supplier1, times(11)).get();
+ verify(onFail, times(1)).accept(any());
+ verify(onSucess, times(0)).accept(any());
verify(supplier2, times(1)).get();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 1d1014e..ae006cc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.RuntimeUtils;
+import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -61,8 +61,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -283,9 +281,9 @@ public class FunctionActioner {
: functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName();
try {
- RuntimeUtils.Actions.newBuilder()
+ Actions.newBuilder()
.addAction(
- RuntimeUtils.Actions.Action.builder()
+ Actions.Action.builder()
.actionName(String.format("Cleaning up subscriptions for function %s", fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
@@ -300,7 +298,7 @@ public class FunctionActioner {
}
} catch (PulsarAdminException e) {
if (e instanceof PulsarAdminException.NotFoundException) {
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(true)
.build();
} else {
@@ -319,14 +317,14 @@ public class FunctionActioner {
}
String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(false)
.errorMsg(String.format("%s - existing consumers: %s", errorMsg, existingConsumers))
.build();
}
}
- return RuntimeUtils.Actions.ActionResult.builder()
+ return Actions.ActionResult.builder()
.success(true)
.build();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 2a93494..c281b15 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -46,6 +47,7 @@ import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.Instance;
+import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
@@ -96,39 +98,44 @@ public class SchedulerManager implements AutoCloseable {
}
private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) {
- Stopwatch stopwatch = Stopwatch.createStarted();
- for (int i = 0; i < 6; i++) {
- try {
- return client.newProducer().topic(config.getFunctionAssignmentTopic())
- .enableBatching(false)
- .blockIfQueueFull(true)
- .compressionType(CompressionType.LZ4)
- .sendTimeout(0, TimeUnit.MILLISECONDS)
- .createAsync().get(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- log.error("Encountered exceptions at creating producer for topic {}",
- config.getFunctionAssignmentTopic(), e);
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- try {
- log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...",
- stopwatch.elapsed(TimeUnit.SECONDS));
- TimeUnit.SECONDS.sleep(10);
- } catch (InterruptedException e1) {
- log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- continue;
- }
+ Actions.Action createProducerAction = Actions.Action.builder()
+ .actionName(String.format("Creating producer for assignment topic %s", config.getFunctionAssignmentTopic()))
+ .numRetries(5)
+ .sleepBetweenInvocationsMs(10000)
+ .supplier(() -> {
+ try {
+ Producer<byte[]> producer = client.newProducer().topic(config.getFunctionAssignmentTopic())
+ .enableBatching(false)
+ .blockIfQueueFull(true)
+ .compressionType(CompressionType.LZ4)
+ .sendTimeout(0, TimeUnit.MILLISECONDS)
+ .createAsync().get(10, TimeUnit.SECONDS);
+ return Actions.ActionResult.builder().success(true).result(producer).build();
+ } catch (Exception e) {
+ log.error("Exception while at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
+ return Actions.ActionResult.builder()
+ .success(false)
+ .build();
+ }
+ })
+ .build();
+ AtomicReference<Producer<byte[]>> producer = new AtomicReference<>();
+ try {
+ Actions.newBuilder()
+ .addAction(createProducerAction.toBuilder()
+ .onSuccess((actionResult) -> producer.set((Producer<byte[]>) actionResult.getResult()))
+ .build())
+ .run();
+ } catch (InterruptedException e) {
+ log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ if (producer.get() == null) {
+ throw new RuntimeException("Can't create a producer on assignment topic "
+ + config.getFunctionAssignmentTopic());
}
- throw new RuntimeException("Can't create a producer on assignment topic "
- + config.getFunctionAssignmentTopic() + " in " + stopwatch.elapsed(TimeUnit.SECONDS)
- + " seconds, fail fast ...");
+ return producer.get();
}
public Future<?> schedule() {