You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/07/07 01:34:58 UTC

[pulsar] branch master updated: Allow function rebalance to be run periodically (#7449)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 304685b  Allow function rebalance to be run periodically (#7449)
304685b is described below

commit 304685bfd20912ee058e15ecafcf34d39b470a47
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Jul 6 18:34:42 2020 -0700

    Allow function rebalance to be run periodically (#7449)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 conf/functions_worker.yml                           |  2 ++
 .../pulsar/functions/worker/WorkerConfig.java       |  5 +++++
 .../pulsar/functions/worker/SchedulerManager.java   | 21 ++++++++++++++++++---
 .../pulsar/functions/worker/WorkerService.java      | 14 ++++++++++++++
 .../functions/worker/rest/api/WorkerImpl.java       |  7 ++++---
 5 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 7ab160e..1b0b9f8 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -93,6 +93,8 @@ schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinSche
 functionAssignmentTopicName: "assignments"
 failureCheckFreqMs: 30000
 rescheduleTimeoutMs: 60000
+# frequency at which to check if cluster needs rebalancing (set to -1 to disable)
+rebalanceCheckFreqSec: -1
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 4b0ddd1..631ae6f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -241,6 +241,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private long rescheduleTimeoutMs;
     @FieldContext(
+            category = CATEGORY_FUNC_RUNTIME_MNG,
+            doc = "The frequency to check whether the cluster needs rebalancing"
+    )
+    private long rebalanceCheckFreqSec;
+    @FieldContext(
         category = CATEGORY_FUNC_RUNTIME_MNG,
         doc = "The max number of retries for initial broker reconnects when function metadata manager"
             + " tries to create producer on metadata topics"
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 4341310..374512a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -113,6 +113,8 @@ public class SchedulerManager implements AutoCloseable {
     private MessageId lastMessageProduced = null;
 
     private MessageId metadataTopicLastMessage = MessageId.earliest;
+    private Future<?> currentRebalanceFuture;
+    private AtomicBoolean rebalanceInProgess = new AtomicBoolean(false);
 
     public SchedulerManager(WorkerConfig workerConfig,
                             PulsarClient pulsarClient,
@@ -222,9 +224,18 @@ public class SchedulerManager implements AutoCloseable {
         return scheduleInternal(() -> invokeScheduler(), "Encountered error when invoking scheduler");
     }
 
-    public Future<?> rebalance() {
+    private Future<?> rebalance() {
         return scheduleInternal(() -> invokeRebalance(), "Encountered error when invoking rebalance");
     }
+
+    public Future<?> rebalanceIfNotInprogress() {
+        if (rebalanceInProgess.compareAndSet(false, true)) {
+            currentRebalanceFuture = rebalance();
+            return currentRebalanceFuture;
+        } else {
+            throw new RebalanceInProgressException();
+        }
+    }
     
     @VisibleForTesting
     void invokeScheduler() {
@@ -273,7 +284,7 @@ public class SchedulerManager implements AutoCloseable {
                     MessageId messageId = publishNewAssignment(newAssignment, false);
 
                     // Directly update in memory assignment cache since I am leader
-                    log.info("Updating assignment: {}", assignment);
+                    log.info("Updating assignment: {}", newAssignment);
                     functionRuntimeManager.processAssignment(newAssignment);
                     // update message id associated with current view of assignments map
                     lastMessageProduced = messageId;
@@ -358,7 +369,8 @@ public class SchedulerManager implements AutoCloseable {
             // update message id associated with current view of assignments map
             lastMessageProduced = messageId;
         }
-        log.info("Total number of new assignments computed for rebalance: {}", rebalancedAssignments.size());
+        log.info("Rebalance - Total number of new assignments computed: {}", rebalancedAssignments.size());
+        rebalanceInProgess.set(false);
     }
 
     private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) {
@@ -511,4 +523,7 @@ public class SchedulerManager implements AutoCloseable {
         }
         return null;
     }
+
+    public static class RebalanceInProgressException extends RuntimeException {
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 7754a82..5b493bc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -257,6 +257,20 @@ public class WorkerService {
                         }
                     });
 
+            if (workerConfig.getRebalanceCheckFreqSec() > 0) {
+                clusterServiceCoordinator.addTask("rebalance-periodic-check",
+                        workerConfig.getRebalanceCheckFreqSec() * 1000,
+                        () -> {
+                            try {
+                                schedulerManager.rebalanceIfNotInprogress().get();
+                            } catch (SchedulerManager.RebalanceInProgressException e) {
+                                log.info("Scheduled for rebalance but rebalance is already in progress. Ignoring.");
+                            } catch (Exception e) {
+                                log.warn("Encountered error when running scheduled rebalance", e);
+                            }
+                        });
+            }
+
             log.info("/** Starting Cluster Service Coordinator **/");
             clusterServiceCoordinator.start();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index f58f57b..9e24f38 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
+import org.apache.pulsar.functions.worker.SchedulerManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 
@@ -217,9 +218,9 @@ public class WorkerImpl {
         }
 
         if (worker().getLeaderService().isLeader()) {
-            if (currentRebalanceFuture == null || currentRebalanceFuture.isDone()) {
-                currentRebalanceFuture = this.worker().getSchedulerManager().rebalance();
-            } else {
+            try {
+                worker().getSchedulerManager().rebalanceIfNotInprogress();
+            } catch (SchedulerManager.RebalanceInProgressException e) {
                 throw new RestException(Status.BAD_REQUEST, "Rebalance already in progress");
             }
         } else {