You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/09/21 22:14:59 UTC

helix git commit: Create AbstractTaskDispatcher for future Task Framework

Repository: helix
Updated Branches:
  refs/heads/master b91d6eee4 -> 1b4e0bbb8


Create AbstractTaskDispatcher for future Task Framework

Refactor existing code logic that move future needed logic to AbstractTaskDispatcher.

RB=1308274
BUG=HELIX-985
G=helix-reviewers
A=hrzhang


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1b4e0bbb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1b4e0bbb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1b4e0bbb

Branch: refs/heads/master
Commit: 1b4e0bbb89736ea7e0bd92c1af98e1ab102397f1
Parents: b91d6ee
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri May 11 11:44:11 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Sep 21 15:14:38 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AbstractTaskDispatcher.java      |  5 +++-
 .../org/apache/helix/task/JobRebalancer.java    | 17 +++++++-----
 .../org/apache/helix/task/TaskRebalancer.java   | 28 +++++++++++---------
 .../java/org/apache/helix/task/TaskUtil.java    |  4 +++
 4 files changed, 34 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index e230fb5..9a5b899 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -19,6 +19,7 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.assigner.AssignableInstance;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +67,7 @@ public abstract class AbstractTaskDispatcher {
         // Check for pending state transitions on this (partition, instance).
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
+
         if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
           processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance,
               pendingMessage, jobState, currState, paMap, assignedPartitions);
@@ -818,6 +820,7 @@ public abstract class AbstractTaskDispatcher {
         incomplete = true;
       }
     }
+
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
       return true;
@@ -925,4 +928,4 @@ public abstract class AbstractTaskDispatcher {
     // This is a targeted task
     return pName(jobCfg.getJobId(), partitionNum);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 5b29c23..4dbf0a0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -33,6 +33,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
@@ -55,8 +56,9 @@ public class JobRebalancer extends TaskRebalancer {
   private static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   @Override
-  public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
-      IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+  public ResourceAssignment computeBestPossiblePartitionState(
+      ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+      CurrentStateOutput currStateOutput) {
     final String jobName = resource.getResourceName();
     LOG.debug("Computer Best Partition for job: " + jobName);
 
@@ -258,6 +260,7 @@ public class JobRebalancer extends TaskRebalancer {
         || (jobCfg.getTargetResource() != null
         && cache.getIdealState(jobCfg.getTargetResource()) != null
         && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
+
       if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
         failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
         return buildEmptyAssignment(jobResource, currStateOutput);
@@ -340,8 +343,8 @@ public class JobRebalancer extends TaskRebalancer {
       Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition,
           instance);
       // If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
-      if (state == TaskPartitionState.RUNNING
-          || (state == TaskPartitionState.INIT && pendingMessage != null)) {
+      if (state == TaskPartitionState.RUNNING || (state == TaskPartitionState.INIT
+          && pendingMessage != null)) {
         return false;
       }
     }
@@ -351,6 +354,7 @@ public class JobRebalancer extends TaskRebalancer {
   /**
    * Get the last task assignment for a given job
    * @param resourceName the name of the job
+   *
    * @return {@link ResourceAssignment} instance, or null if no assignment is available
    */
   private ResourceAssignment getPrevResourceAssignment(String resourceName) {
@@ -395,8 +399,9 @@ public class JobRebalancer extends TaskRebalancer {
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
+   * @param prevAssignment    task partition -> (instance -> state)
    * @param allTaskPartitions all task partitionIds
+   *
    * @return instance -> partitionIds from previous assignment, if the instance is still live
    */
   private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
@@ -438,4 +443,4 @@ public class JobRebalancer extends TaskRebalancer {
     }
     return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index dae0da6..2f88e24 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -47,26 +47,25 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
     implements Rebalancer, MappingCalculator {
   private static final Logger LOG = LoggerFactory.getLogger(TaskRebalancer.class);
 
-  @Override
-  public void init(HelixManager manager) {
+  @Override public void init(HelixManager manager) {
     _manager = manager;
   }
 
-  @Override
-  public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
+  @Override public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
       IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput);
 
   /**
    * Checks if the workflow has been stopped.
+   *
    * @param ctx Workflow context containing task states
    * @param cfg Workflow config containing set of tasks
+   *
    * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
    */
   protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
     for (String job : cfg.getJobDag().getAllNodes()) {
       TaskState jobState = ctx.getJobState(job);
-      if (jobState != null
-          && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) {
+      if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) {
         return false;
       }
     }
@@ -90,9 +89,11 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
 
   /**
    * Check all the dependencies of a job to determine whether the job is ready to be scheduled.
+   *
    * @param job
    * @param workflowCfg
    * @param workflowCtx
+   *
    * @return
    */
   protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg,
@@ -116,8 +117,8 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
     // If there is any parent job not started, this job should not be scheduled
     if (notStartedCount > 0) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Job %s is not ready to start, notStartedParent(s)=%d.", job,
-            notStartedCount));
+        LOG.debug(String
+            .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount));
       }
       return false;
     }
@@ -132,8 +133,8 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
     if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
       markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache);
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Job %s is not ready to start, failedCount(s)=%d.", job,
-            failedOrTimeoutCount));
+        LOG.debug(String
+            .format("Job %s is not ready to start, failedCount(s)=%d.", job, failedOrTimeoutCount));
       }
       return false;
     }
@@ -164,7 +165,9 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
 
   /**
    * Check if a workflow is ready to schedule.
+   *
    * @param workflowCfg the workflow to check
+   *
    * @return true if the workflow is ready for schedule, false if not ready
    */
   protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
@@ -173,11 +176,10 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
     return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
   }
 
-  @Override
-  public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+  @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
       CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
     // All of the heavy lifting is in the ResourceAssignment computation,
     // so this part can just be a no-op.
     return currentIdealState;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index ded3aa2..1ce448c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -838,7 +838,9 @@ public class TaskUtil {
 
   /**
    * Check whether tasks are just started or still running
+   *
    * @param jobContext The job context
+   *
    * @return False if still tasks not in final state. Otherwise return true
    */
   public static boolean checkJobStopped(JobContext jobContext) {
@@ -851,8 +853,10 @@ public class TaskUtil {
     return true;
   }
 
+
   /**
    * Count the number of jobs in a workflow that are not in final state.
+   *
    * @param workflowCfg
    * @param workflowCtx
    * @return