You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/09/17 22:51:34 UTC

[2/4] helix git commit: For performance improvement, we need to differentiate the task pipeline and regular pipeline.

For performance improvement, we need to differentiate the task pipeline and regular pipeline.

1. Split the task pipeline out from regular pipeline.
2. Remove unnecessary stages for Task pipeline which are independent from previous outputs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c45d3a66
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c45d3a66
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c45d3a66

Branch: refs/heads/master
Commit: c45d3a66f55dcddb2b95a1872e76f3030572d2c7
Parents: fc868b3
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Jun 7 17:37:54 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 17 15:45:04 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 56 ++++++++++++++++++--
 1 file changed, 53 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c45d3a66/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8d1e44b..9f94755 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -150,12 +150,12 @@ public class GenericHelixController implements IdealStateChangeListener,
    */
   public GenericHelixController() {
     this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createDefaultRegistry(PipelineTypes.TASK.name()));
+        createTaskRegistry(PipelineTypes.TASK.name()));
   }
 
   public GenericHelixController(String clusterName) {
     this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createDefaultRegistry(PipelineTypes.TASK.name()), clusterName);
+        createTaskRegistry(PipelineTypes.TASK.name()), clusterName);
   }
 
   class RebalanceTask extends TimerTask {
@@ -281,13 +281,63 @@ public class GenericHelixController implements IdealStateChangeListener,
       registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
       registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
       registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
-      registry.register(ClusterEventType.ExternalViewChange, dataRefresh);
       registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
       registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
       return registry;
     }
   }
 
+  private static PipelineRegistry createTaskRegistry(String pipelineName) {
+    logger.info("createDefaultRegistry");
+    synchronized (GenericHelixController.class) {
+      PipelineRegistry registry = new PipelineRegistry();
+
+      // cluster data cache refresh
+      Pipeline dataRefresh = new Pipeline(pipelineName);
+      dataRefresh.addStage(new ReadClusterDataStage());
+
+      // data pre-process pipeline
+      Pipeline dataPreprocess = new Pipeline(pipelineName);
+      dataPreprocess.addStage(new ResourceComputationStage());
+      dataPreprocess.addStage(new ResourceValidationStage());
+      dataPreprocess.addStage(new CurrentStateComputationStage());
+
+      // rebalance pipeline
+      // TODO: Junkai will work on refactoring existing pipeline log into abstract logic and
+      // extend the logic to separate pipeline
+      Pipeline rebalancePipeline = new Pipeline(pipelineName);
+      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new IntermediateStateCalcStage());
+      rebalancePipeline.addStage(new MessageGenerationPhase());
+      rebalancePipeline.addStage(new MessageSelectionStage());
+      rebalancePipeline.addStage(new MessageThrottleStage());
+      rebalancePipeline.addStage(new TaskAssignmentStage());
+
+      // backward compatibility check
+      Pipeline liveInstancePipeline = new Pipeline(pipelineName);
+      liveInstancePipeline.addStage(new CompatibilityCheckStage());
+
+      registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline,
+          dataPreprocess, rebalancePipeline);
+      registry
+          .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      return registry;
+    }
+  }
+
   public GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry) {
     this(registry, taskRegistry, null);
   }