You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/12/06 00:44:11 UTC
[helix] branch wagedRebalancer updated: Decouple the event type and
the scheduled rebalance cache refresh option. (#638)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedRebalancer by this push:
new 51f960e Decouple the event type and the scheduled rebalance cache refresh option. (#638)
51f960e is described below
commit 51f960efa37417aa2f6cd0664ffd2c3c99b8a936
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Dec 5 16:44:02 2019 -0800
Decouple the event type and the scheduled rebalance cache refresh option. (#638)
The previous design is that both on-demand and periodic rebalance scheduling task will request for a cache refresh. This won't be always true moving forward.
For example, the WAGED rebalancer async baseline calculating requests for a scheduled rebalance. But cache refresh won't be necessary.
This PR does not change any business logic. It prepares for future feature change.
This PR ensures strict backward compatibility.
---
.../helix/controller/GenericHelixController.java | 40 ++++++++++++++++++----
.../java/org/apache/helix/util/RebalanceUtil.java | 7 +++-
2 files changed, 39 insertions(+), 8 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index b80b8cc..3ccea41 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,13 +19,13 @@ package org.apache.helix.controller;
* under the License.
*/
-import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -36,6 +36,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Sets;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -98,7 +100,7 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.helix.HelixConstants.*;
+import static org.apache.helix.HelixConstants.ChangeType;
/**
* Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State.
@@ -220,17 +222,29 @@ public class GenericHelixController implements IdealStateChangeListener,
class RebalanceTask extends TimerTask {
final HelixManager _manager;
final ClusterEventType _clusterEventType;
+ private final Optional<Boolean> _shouldRefreshCacheOption;
private long _nextRebalanceTime;
public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) {
this(manager, clusterEventType, -1);
+ }
+ public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+ long nextRebalanceTime) {
+ this(manager, clusterEventType, nextRebalanceTime, Optional.empty());
}
- public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType, long nextRebalanceTime) {
+ public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+ long nextRebalanceTime, boolean shouldRefreshCache) {
+ this(manager, clusterEventType, nextRebalanceTime, Optional.of(shouldRefreshCache));
+ }
+
+ private RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+ long nextRebalanceTime, Optional<Boolean> shouldRefreshCacheOption) {
_manager = manager;
_clusterEventType = clusterEventType;
_nextRebalanceTime = nextRebalanceTime;
+ _shouldRefreshCacheOption = shouldRefreshCacheOption;
}
public long getNextRebalanceTime() {
@@ -240,8 +254,9 @@ public class GenericHelixController implements IdealStateChangeListener,
@Override
public void run() {
try {
- if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
- .equals(ClusterEventType.OnDemandRebalance)) {
+ if (_shouldRefreshCacheOption.orElse(
+ _clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
+ .equals(ClusterEventType.OnDemandRebalance))) {
requestDataProvidersFullRefresh();
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -359,7 +374,17 @@ public class GenericHelixController implements IdealStateChangeListener,
* Schedule an on demand rebalance pipeline.
* @param delay
*/
+ @Deprecated
public void scheduleOnDemandRebalance(long delay) {
+ scheduleOnDemandRebalance(delay, true);
+ }
+
+ /**
+ * Schedule an on demand rebalance pipeline.
+ * @param delay
+ * @param shouldRefreshCache true if refresh the cache before scheduling a rebalance.
+ */
+ public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) {
if (_helixManager == null) {
logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!",
_clusterName);
@@ -377,7 +402,8 @@ public class GenericHelixController implements IdealStateChangeListener,
}
RebalanceTask newTask =
- new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime);
+ new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime,
+ shouldRefreshCache);
_onDemandRebalanceTimer.schedule(newTask, delay);
logger.info("Scheduled instant pipeline run for cluster {}." , _helixManager.getClusterName());
@@ -1232,4 +1258,4 @@ public class GenericHelixController implements IdealStateChangeListener,
eventThread.setDaemon(true);
eventThread.start();
}
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 18163bd..050762d 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -143,6 +143,11 @@ public class RebalanceUtil {
}
public static void scheduleOnDemandPipeline(String clusterName, long delay) {
+ scheduleOnDemandPipeline(clusterName, delay, true);
+ }
+
+ public static void scheduleOnDemandPipeline(String clusterName, long delay,
+ boolean shouldRefreshCache) {
if (clusterName == null) {
LOG.error("Failed to issue a pipeline run. ClusterName is null.");
return;
@@ -153,7 +158,7 @@ public class RebalanceUtil {
}
GenericHelixController controller = GenericHelixController.getController(clusterName);
if (controller != null) {
- controller.scheduleOnDemandRebalance(delay);
+ controller.scheduleOnDemandRebalance(delay, shouldRefreshCache);
} else {
LOG.error("Failed to issue a pipeline. Controller for cluster {} does not exist.",
clusterName);