You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/12/06 00:44:11 UTC

[helix] branch wagedRebalancer updated: Decouple the event type and the scheduled rebalance cache refresh option. (#638)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 51f960e  Decouple the event type and the scheduled rebalance cache refresh option. (#638)
51f960e is described below

commit 51f960efa37417aa2f6cd0664ffd2c3c99b8a936
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Dec 5 16:44:02 2019 -0800

    Decouple the event type and the scheduled rebalance cache refresh option. (#638)
    
    The previous design is that both on-demand and periodic rebalance scheduling task will request for a cache refresh. This won't be always true moving forward.
    For example, the WAGED rebalancer async baseline calculating requests for a scheduled rebalance. But cache refresh won't be necessary.
    This PR does not change any business logic. It prepares for future feature change.
    This PR ensures strict backward compatibility.
---
 .../helix/controller/GenericHelixController.java   | 40 ++++++++++++++++++----
 .../java/org/apache/helix/util/RebalanceUtil.java  |  7 +++-
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index b80b8cc..3ccea41 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,13 +19,13 @@ package org.apache.helix.controller;
  * under the License.
  */
 
-import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -36,6 +36,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Sets;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -98,7 +100,7 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.helix.HelixConstants.*;
+import static org.apache.helix.HelixConstants.ChangeType;
 
 /**
  * Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State.
@@ -220,17 +222,29 @@ public class GenericHelixController implements IdealStateChangeListener,
   class RebalanceTask extends TimerTask {
     final HelixManager _manager;
     final ClusterEventType _clusterEventType;
+    private final Optional<Boolean> _shouldRefreshCacheOption;
     private long _nextRebalanceTime;
 
     public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) {
       this(manager, clusterEventType, -1);
+    }
 
+    public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+        long nextRebalanceTime) {
+      this(manager, clusterEventType, nextRebalanceTime, Optional.empty());
     }
 
-    public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType, long nextRebalanceTime) {
+    public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+        long nextRebalanceTime, boolean shouldRefreshCache) {
+      this(manager, clusterEventType, nextRebalanceTime, Optional.of(shouldRefreshCache));
+    }
+
+    private RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+        long nextRebalanceTime, Optional<Boolean> shouldRefreshCacheOption) {
       _manager = manager;
       _clusterEventType = clusterEventType;
       _nextRebalanceTime = nextRebalanceTime;
+      _shouldRefreshCacheOption = shouldRefreshCacheOption;
     }
 
     public long getNextRebalanceTime() {
@@ -240,8 +254,9 @@ public class GenericHelixController implements IdealStateChangeListener,
     @Override
     public void run() {
       try {
-        if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
-            .equals(ClusterEventType.OnDemandRebalance)) {
+        if (_shouldRefreshCacheOption.orElse(
+            _clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
+                .equals(ClusterEventType.OnDemandRebalance))) {
           requestDataProvidersFullRefresh();
 
           HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -359,7 +374,17 @@ public class GenericHelixController implements IdealStateChangeListener,
    * Schedule an on demand rebalance pipeline.
    * @param delay
    */
+  @Deprecated
   public void scheduleOnDemandRebalance(long delay) {
+    scheduleOnDemandRebalance(delay, true);
+  }
+
+  /**
+   * Schedule an on demand rebalance pipeline.
+   * @param delay
+   * @param shouldRefreshCache true if refresh the cache before scheduling a rebalance.
+   */
+  public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) {
     if (_helixManager == null) {
       logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!",
           _clusterName);
@@ -377,7 +402,8 @@ public class GenericHelixController implements IdealStateChangeListener,
     }
 
     RebalanceTask newTask =
-        new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime);
+        new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime,
+            shouldRefreshCache);
 
     _onDemandRebalanceTimer.schedule(newTask, delay);
     logger.info("Scheduled instant pipeline run for cluster {}." , _helixManager.getClusterName());
@@ -1232,4 +1258,4 @@ public class GenericHelixController implements IdealStateChangeListener,
     eventThread.setDaemon(true);
     eventThread.start();
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 18163bd..050762d 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -143,6 +143,11 @@ public class RebalanceUtil {
   }
 
   public static void scheduleOnDemandPipeline(String clusterName, long delay) {
+    scheduleOnDemandPipeline(clusterName, delay, true);
+  }
+
+  public static void scheduleOnDemandPipeline(String clusterName, long delay,
+      boolean shouldRefreshCache) {
     if (clusterName == null) {
       LOG.error("Failed to issue a pipeline run. ClusterName is null.");
       return;
@@ -153,7 +158,7 @@ public class RebalanceUtil {
     }
     GenericHelixController controller = GenericHelixController.getController(clusterName);
     if (controller != null) {
-      controller.scheduleOnDemandRebalance(delay);
+      controller.scheduleOnDemandRebalance(delay, shouldRefreshCache);
     } else {
       LOG.error("Failed to issue a pipeline. Controller for cluster {} does not exist.",
           clusterName);