You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/07 00:01:30 UTC
[incubator-pulsar] branch master updated: adding max retries for
assignment write wait (#1348)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e82ff7e adding max retries for assignment write wait (#1348)
e82ff7e is described below
commit e82ff7e3d5afc736e1ec4e39e165b062255b025a
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Mar 6 16:01:28 2018 -0800
adding max retries for assignment write wait (#1348)
* adding max retries for assignment write wait. Also fix issue with SchedulerAssignmentTest
* cleaning up tests
* cleaning up formatting
---
conf/functions_worker.yml | 1 +
.../pulsar/functions/worker/SchedulerManager.java | 6 +++
.../pulsar/functions/worker/WorkerConfig.java | 1 +
.../functions/worker/SchedulerManagerTest.java | 44 ++++++++--------------
4 files changed, 23 insertions(+), 29 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index c2c4091..daaeae9 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,3 +44,4 @@ functionAssignmentTopicName: "assignments"
failureCheckFreqMs: 30000
rescheduleTimeoutMs: 60000
initialBrokerReconnectMaxRetries: 60
+assignmentWriteMaxRetries: 60
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index de0228d..fd61161 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -164,13 +164,19 @@ public class SchedulerManager implements AutoCloseable {
}
// wait for assignment update to go throw the pipeline
+ int retries = 0;
while (this.functionRuntimeManager.getCurrentAssignmentVersion() < assignmentVersion) {
+ if (retries >= this.workerConfig.getAssignmentWriteMaxRetries()) {
+ log.warn("Max number of retries reached for waiting for assignment to propagate. Will continue now.");
+ break;
+ }
log.info("Waiting for assignments to propagate...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
+ retries++;
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 84f2438..579572a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -57,6 +57,7 @@ public class WorkerConfig implements Serializable {
private long failureCheckFreqMs;
private long rescheduleTimeoutMs;
private int initialBrokerReconnectMaxRetries;
+ private int assignmentWriteMaxRetries;
@Data
@Setter
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index f9a03db..54a245f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -41,9 +41,8 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
@@ -74,6 +73,7 @@ public class SchedulerManagerTest {
workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig()
.setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
+ workerConfig.setAssignmentWriteMaxRetries(0);
producer = mock(Producer.class);
completableFuture = spy(new CompletableFuture<>());
@@ -94,7 +94,8 @@ public class SchedulerManagerTest {
}
@Test
- public void testSchedule() throws PulsarClientException, NoSuchMethodException, InterruptedException {
+ public void testSchedule() throws PulsarClientException, NoSuchMethodException, InterruptedException,
+ TimeoutException, ExecutionException {
List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
long version = 5;
@@ -139,7 +140,7 @@ public class SchedulerManagerTest {
@Test
public void testNothingNewToSchedule() throws InterruptedException, ExecutionException, NoSuchMethodException,
- InvalidProtocolBufferException {
+ InvalidProtocolBufferException, TimeoutException {
List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
long version = 5;
@@ -191,7 +192,7 @@ public class SchedulerManagerTest {
@Test
public void testAddingFunctions() throws NoSuchMethodException, InterruptedException,
- InvalidProtocolBufferException {
+ InvalidProtocolBufferException, TimeoutException, ExecutionException {
List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
long version = 5;
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -255,7 +256,7 @@ public class SchedulerManagerTest {
@Test
public void testDeletingFunctions() throws NoSuchMethodException, InterruptedException,
- InvalidProtocolBufferException {
+ InvalidProtocolBufferException, TimeoutException, ExecutionException {
List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
long version = 5;
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -321,7 +322,8 @@ public class SchedulerManagerTest {
}
@Test
- public void testScalingUp() throws NoSuchMethodException, InterruptedException, InvalidProtocolBufferException, PulsarClientException {
+ public void testScalingUp() throws NoSuchMethodException, InterruptedException, InvalidProtocolBufferException,
+ PulsarClientException, TimeoutException, ExecutionException {
List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
long version = 5;
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -431,7 +433,7 @@ public class SchedulerManagerTest {
@Test
public void testScalingDown() throws PulsarClientException, NoSuchMethodException, InterruptedException,
- InvalidProtocolBufferException {
+ InvalidProtocolBufferException, TimeoutException, ExecutionException {
List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
long version = 5;
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -542,7 +544,7 @@ public class SchedulerManagerTest {
@Test
public void testUpdate() throws PulsarClientException, NoSuchMethodException, InterruptedException,
- InvalidProtocolBufferException {
+ InvalidProtocolBufferException, TimeoutException, ExecutionException {
List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
long version = 5;
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
@@ -666,29 +668,13 @@ public class SchedulerManagerTest {
);
}
- private void callSchedule() throws NoSuchMethodException, InterruptedException {
+ private void callSchedule() throws NoSuchMethodException, InterruptedException,
+ TimeoutException, ExecutionException {
long intialVersion = functionRuntimeManager.getCurrentAssignmentVersion();
- int initalCount = getMethodInvocationDetails(completableFuture,
- CompletableFuture.class.getMethod("get")).size();
- log.info("initalCount: {}", initalCount);
Future<?> complete = schedulerManager.schedule();
- int count = 0;
- while (!complete.isDone()) {
- int invocationCount = getMethodInvocationDetails(completableFuture,
- CompletableFuture.class.getMethod("get")).size();
- log.info("invocationCount: {}", invocationCount);
-
- if (invocationCount >= initalCount + 1) {
- doReturn(intialVersion + 1).when(functionRuntimeManager).getCurrentAssignmentVersion();
- }
-
- if (count > 100) {
- Assert.fail("Scheduler failed to terminate!");
- }
- Thread.sleep(100);
- count++;
- }
+ complete.get(30, TimeUnit.SECONDS);
+ doReturn(intialVersion + 1).when(functionRuntimeManager).getCurrentAssignmentVersion();
}
private List<Invocation> getMethodInvocationDetails(Object o, Method method) throws NoSuchMethodException {
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.