You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/07/07 01:34:58 UTC
[pulsar] branch master updated: Allow function rebalance to be run
periodically (#7449)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 304685b Allow function rebalance to be run periodically (#7449)
304685b is described below
commit 304685bfd20912ee058e15ecafcf34d39b470a47
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Jul 6 18:34:42 2020 -0700
Allow function rebalance to be run periodically (#7449)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
conf/functions_worker.yml | 2 ++
.../pulsar/functions/worker/WorkerConfig.java | 5 +++++
.../pulsar/functions/worker/SchedulerManager.java | 21 ++++++++++++++++++---
.../pulsar/functions/worker/WorkerService.java | 14 ++++++++++++++
.../functions/worker/rest/api/WorkerImpl.java | 7 ++++---
5 files changed, 43 insertions(+), 6 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 7ab160e..1b0b9f8 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -93,6 +93,8 @@ schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinSche
functionAssignmentTopicName: "assignments"
failureCheckFreqMs: 30000
rescheduleTimeoutMs: 60000
+# frequency at which to check if cluster needs rebalancing (set to -1 to disable)
+rebalanceCheckFreqSec: -1
initialBrokerReconnectMaxRetries: 60
assignmentWriteMaxRetries: 60
instanceLivenessCheckFreqMs: 30000
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 4b0ddd1..631ae6f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -241,6 +241,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private long rescheduleTimeoutMs;
@FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "The frequency to check whether the cluster needs rebalancing"
+ )
+ private long rebalanceCheckFreqSec;
+ @FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The max number of retries for initial broker reconnects when function metadata manager"
+ " tries to create producer on metadata topics"
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 4341310..374512a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -113,6 +113,8 @@ public class SchedulerManager implements AutoCloseable {
private MessageId lastMessageProduced = null;
private MessageId metadataTopicLastMessage = MessageId.earliest;
+ private Future<?> currentRebalanceFuture;
+ private AtomicBoolean rebalanceInProgess = new AtomicBoolean(false);
public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
@@ -222,9 +224,18 @@ public class SchedulerManager implements AutoCloseable {
return scheduleInternal(() -> invokeScheduler(), "Encountered error when invoking scheduler");
}
- public Future<?> rebalance() {
+ private Future<?> rebalance() {
return scheduleInternal(() -> invokeRebalance(), "Encountered error when invoking rebalance");
}
+
+ public Future<?> rebalanceIfNotInprogress() {
+ if (rebalanceInProgess.compareAndSet(false, true)) {
+ currentRebalanceFuture = rebalance();
+ return currentRebalanceFuture;
+ } else {
+ throw new RebalanceInProgressException();
+ }
+ }
@VisibleForTesting
void invokeScheduler() {
@@ -273,7 +284,7 @@ public class SchedulerManager implements AutoCloseable {
MessageId messageId = publishNewAssignment(newAssignment, false);
// Directly update in memory assignment cache since I am leader
- log.info("Updating assignment: {}", assignment);
+ log.info("Updating assignment: {}", newAssignment);
functionRuntimeManager.processAssignment(newAssignment);
// update message id associated with current view of assignments map
lastMessageProduced = messageId;
@@ -358,7 +369,8 @@ public class SchedulerManager implements AutoCloseable {
// update message id associated with current view of assignments map
lastMessageProduced = messageId;
}
- log.info("Total number of new assignments computed for rebalance: {}", rebalancedAssignments.size());
+ log.info("Rebalance - Total number of new assignments computed: {}", rebalancedAssignments.size());
+ rebalanceInProgess.set(false);
}
private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) {
@@ -511,4 +523,7 @@ public class SchedulerManager implements AutoCloseable {
}
return null;
}
+
+ public static class RebalanceInProgressException extends RuntimeException {
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 7754a82..5b493bc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -257,6 +257,20 @@ public class WorkerService {
}
});
+ if (workerConfig.getRebalanceCheckFreqSec() > 0) {
+ clusterServiceCoordinator.addTask("rebalance-periodic-check",
+ workerConfig.getRebalanceCheckFreqSec() * 1000,
+ () -> {
+ try {
+ schedulerManager.rebalanceIfNotInprogress().get();
+ } catch (SchedulerManager.RebalanceInProgressException e) {
+ log.info("Scheduled for rebalance but rebalance is already in progress. Ignoring.");
+ } catch (Exception e) {
+ log.warn("Encountered error when running scheduled rebalance", e);
+ }
+ });
+ }
+
log.info("/** Starting Cluster Service Coordinator **/");
clusterServiceCoordinator.start();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index f58f57b..9e24f38 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
+import org.apache.pulsar.functions.worker.SchedulerManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
@@ -217,9 +218,9 @@ public class WorkerImpl {
}
if (worker().getLeaderService().isLeader()) {
- if (currentRebalanceFuture == null || currentRebalanceFuture.isDone()) {
- currentRebalanceFuture = this.worker().getSchedulerManager().rebalance();
- } else {
+ try {
+ worker().getSchedulerManager().rebalanceIfNotInprogress();
+ } catch (SchedulerManager.RebalanceInProgressException e) {
throw new RestException(Status.BAD_REQUEST, "Rebalance already in progress");
}
} else {