You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 20:47:09 UTC

helix git commit: [HELIX-709] Move external view calculation to async stage and re-organize pipeline

Repository: helix
Updated Branches:
  refs/heads/master 37f3d4c8d -> 542fbc840


[HELIX-709] Move external view calculation to async stage and re-organize pipeline


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/542fbc84
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/542fbc84
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/542fbc84

Branch: refs/heads/master
Commit: 542fbc840a167986a40bd57f3c5660d294acb63c
Parents: 37f3d4c
Author: Harry Zhang <hr...@linkedin.com>
Authored: Mon Jul 9 12:16:56 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Mon Jul 9 12:16:56 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 29 ++++++++++----------
 .../controller/pipeline/AsyncWorkerType.java    |  3 +-
 .../stages/ExternalViewComputeStage.java        | 12 ++++++--
 .../helix/manager/zk/CallbackHandler.java       |  5 ++--
 .../java/org/apache/helix/ZkUnitTestBase.java   | 12 +++++++-
 5 files changed, 39 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 7603975..474d621 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -248,11 +248,14 @@ public class GenericHelixController implements IdealStateChangeListener,
       Pipeline dataRefresh = new Pipeline(pipelineName);
       dataRefresh.addStage(new ReadClusterDataStage());
 
+      // data pre-process pipeline
+      Pipeline dataPreprocess = new Pipeline(pipelineName);
+      dataPreprocess.addStage(new ResourceComputationStage());
+      dataPreprocess.addStage(new ResourceValidationStage());
+      dataPreprocess.addStage(new CurrentStateComputationStage());
+
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline(pipelineName);
-      rebalancePipeline.addStage(new ResourceComputationStage());
-      rebalancePipeline.addStage(new ResourceValidationStage());
-      rebalancePipeline.addStage(new CurrentStateComputationStage());
       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
       rebalancePipeline.addStage(new IntermediateStateCalcStage());
       rebalancePipeline.addStage(new MessageGenerationPhase());
@@ -270,18 +273,16 @@ public class GenericHelixController implements IdealStateChangeListener,
       Pipeline liveInstancePipeline = new Pipeline(pipelineName);
       liveInstancePipeline.addStage(new CompatibilityCheckStage());
 
-      registry.register(ClusterEventType.IdealStateChange, dataRefresh, rebalancePipeline);
-      registry.register(ClusterEventType.CurrentStateChange, dataRefresh, rebalancePipeline, externalViewPipeline);
-      registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, rebalancePipeline);
-      registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, rebalancePipeline);
-      registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, rebalancePipeline);
-      registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, rebalancePipeline,
-          externalViewPipeline);
-      registry.register(ClusterEventType.MessageChange, dataRefresh, rebalancePipeline);
+      registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
       registry.register(ClusterEventType.ExternalViewChange, dataRefresh);
-      registry.register(ClusterEventType.Resume, dataRefresh, rebalancePipeline, externalViewPipeline);
-      registry
-          .register(ClusterEventType.PeriodicalRebalance, dataRefresh, rebalancePipeline, externalViewPipeline);
+      registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
       return registry;
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index 62e324c..443db31 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -28,5 +28,6 @@ package org.apache.helix.controller.pipeline;
 
 public enum AsyncWorkerType {
   TargetExternalViewCalcWorker,
-  PersistAssignmentWorker
+  PersistAssignmentWorker,
+  ExternalViewComputeWorker
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e2bd2a9..591867d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -22,7 +22,8 @@ package org.apache.helix.controller.stages;
 import org.apache.helix.*;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.*;
@@ -33,11 +34,16 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-public class ExternalViewComputeStage extends AbstractBaseStage {
+public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class);
 
   @Override
-  public void process(ClusterEvent event) throws Exception {
+  public AsyncWorkerType getAsyncWorkerType() {
+    return AsyncWorkerType.ExternalViewComputeWorker;
+  }
+
+  @Override
+  public void execute(final ClusterEvent event) throws Exception {
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());

http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 12f7d0f..cd446e8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -643,9 +643,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   @Override
   public void handleChildChange(String parentPath, List<String> currentChilds) {
     if (logger.isDebugEnabled()) {
-      logger.debug(
-          "Data change callback: child changed, path: " + parentPath + ", current child count: "
-              + currentChilds.size());
+      logger.debug("Data change callback: child changed, path: {} , current child count: {}",
+          parentPath, currentChilds == null ? 0 : currentChilds.size());
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index b29375b..483f0af 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -28,6 +28,7 @@ import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
@@ -349,7 +350,16 @@ public class ZkUnitTestBase {
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
-    stage.process(event);
+
+    // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in
+    // execute() function call
+    // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving runStage()
+    // to a shared library
+    if (stage instanceof AbstractAsyncBaseStage) {
+      ((AbstractAsyncBaseStage) stage).execute(event);
+    } else {
+      stage.process(event);
+    }
     stage.postProcess();
   }