You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 20:47:09 UTC
helix git commit: [HELIX-709] Move external view calculation to async
stage and re-organize pipeline
Repository: helix
Updated Branches:
refs/heads/master 37f3d4c8d -> 542fbc840
[HELIX-709] Move external view calculation to async stage and re-organize pipeline
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/542fbc84
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/542fbc84
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/542fbc84
Branch: refs/heads/master
Commit: 542fbc840a167986a40bd57f3c5660d294acb63c
Parents: 37f3d4c
Author: Harry Zhang <hr...@linkedin.com>
Authored: Mon Jul 9 12:16:56 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Mon Jul 9 12:16:56 2018 -0700
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 29 ++++++++++----------
.../controller/pipeline/AsyncWorkerType.java | 3 +-
.../stages/ExternalViewComputeStage.java | 12 ++++++--
.../helix/manager/zk/CallbackHandler.java | 5 ++--
.../java/org/apache/helix/ZkUnitTestBase.java | 12 +++++++-
5 files changed, 39 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 7603975..474d621 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -248,11 +248,14 @@ public class GenericHelixController implements IdealStateChangeListener,
Pipeline dataRefresh = new Pipeline(pipelineName);
dataRefresh.addStage(new ReadClusterDataStage());
+ // data pre-process pipeline
+ Pipeline dataPreprocess = new Pipeline(pipelineName);
+ dataPreprocess.addStage(new ResourceComputationStage());
+ dataPreprocess.addStage(new ResourceValidationStage());
+ dataPreprocess.addStage(new CurrentStateComputationStage());
+
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline(pipelineName);
- rebalancePipeline.addStage(new ResourceComputationStage());
- rebalancePipeline.addStage(new ResourceValidationStage());
- rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageGenerationPhase());
@@ -270,18 +273,16 @@ public class GenericHelixController implements IdealStateChangeListener,
Pipeline liveInstancePipeline = new Pipeline(pipelineName);
liveInstancePipeline.addStage(new CompatibilityCheckStage());
- registry.register(ClusterEventType.IdealStateChange, dataRefresh, rebalancePipeline);
- registry.register(ClusterEventType.CurrentStateChange, dataRefresh, rebalancePipeline, externalViewPipeline);
- registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, rebalancePipeline);
- registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, rebalancePipeline);
- registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, rebalancePipeline);
- registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, rebalancePipeline,
- externalViewPipeline);
- registry.register(ClusterEventType.MessageChange, dataRefresh, rebalancePipeline);
+ registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess, rebalancePipeline);
+ registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
+ registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
+ registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
+ registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
+ registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
+ registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.ExternalViewChange, dataRefresh);
- registry.register(ClusterEventType.Resume, dataRefresh, rebalancePipeline, externalViewPipeline);
- registry
- .register(ClusterEventType.PeriodicalRebalance, dataRefresh, rebalancePipeline, externalViewPipeline);
+ registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
+ registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
return registry;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
index 62e324c..443db31 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -28,5 +28,6 @@ package org.apache.helix.controller.pipeline;
public enum AsyncWorkerType {
TargetExternalViewCalcWorker,
- PersistAssignmentWorker
+ PersistAssignmentWorker,
+ ExternalViewComputeWorker
}
http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e2bd2a9..591867d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -22,7 +22,8 @@ package org.apache.helix.controller.stages;
import org.apache.helix.*;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecordDelta.MergeOperation;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.model.*;
@@ -33,11 +34,16 @@ import org.slf4j.LoggerFactory;
import java.util.*;
-public class ExternalViewComputeStage extends AbstractBaseStage {
+public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class);
@Override
- public void process(ClusterEvent event) throws Exception {
+ public AsyncWorkerType getAsyncWorkerType() {
+ return AsyncWorkerType.ExternalViewComputeWorker;
+ }
+
+ @Override
+ public void execute(final ClusterEvent event) throws Exception {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 12f7d0f..cd446e8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -643,9 +643,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) {
if (logger.isDebugEnabled()) {
- logger.debug(
- "Data change callback: child changed, path: " + parentPath + ", current child count: "
- + currentChilds.size());
+ logger.debug("Data change callback: child changed, path: {} , current child count: {}",
+ parentPath, currentChilds == null ? 0 : currentChilds.size());
}
try {
http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index b29375b..483f0af 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -28,6 +28,7 @@ import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.ZkServer;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
@@ -349,7 +350,16 @@ public class ZkUnitTestBase {
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
- stage.process(event);
+
+ // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in
+ // execute() function call
+ // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving runStage()
+ // to a shared library
+ if (stage instanceof AbstractAsyncBaseStage) {
+ ((AbstractAsyncBaseStage) stage).execute(event);
+ } else {
+ stage.process(event);
+ }
stage.postProcess();
}