You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/09/21 22:14:59 UTC
helix git commit: Create AbstractTaskDispatcher for future Task
Framework
Repository: helix
Updated Branches:
refs/heads/master b91d6eee4 -> 1b4e0bbb8
Create AbstractTaskDispatcher for future Task Framework
Refactor existing code logic that move future needed logic to AbstractTaskDispatcher.
RB=1308274
BUG=HELIX-985
G=helix-reviewers
A=hrzhang
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1b4e0bbb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1b4e0bbb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1b4e0bbb
Branch: refs/heads/master
Commit: 1b4e0bbb89736ea7e0bd92c1af98e1ab102397f1
Parents: b91d6ee
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri May 11 11:44:11 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Fri Sep 21 15:14:38 2018 -0700
----------------------------------------------------------------------
.../helix/task/AbstractTaskDispatcher.java | 5 +++-
.../org/apache/helix/task/JobRebalancer.java | 17 +++++++-----
.../org/apache/helix/task/TaskRebalancer.java | 28 +++++++++++---------
.../java/org/apache/helix/task/TaskUtil.java | 4 +++
4 files changed, 34 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index e230fb5..9a5b899 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -19,6 +19,7 @@ import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.task.assigner.AssignableInstance;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +67,7 @@ public abstract class AbstractTaskDispatcher {
// Check for pending state transitions on this (partition, instance).
Message pendingMessage =
currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
+
if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
processTaskWithPendingMessage(prevTaskToInstanceStateAssignment, pId, pName, instance,
pendingMessage, jobState, currState, paMap, assignedPartitions);
@@ -818,6 +820,7 @@ public abstract class AbstractTaskDispatcher {
incomplete = true;
}
}
+
if (!incomplete && cfg.isTerminable()) {
ctx.setWorkflowState(TaskState.COMPLETED);
return true;
@@ -925,4 +928,4 @@ public abstract class AbstractTaskDispatcher {
// This is a targeted task
return pName(jobCfg.getJobId(), partitionNum);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 5b29c23..4dbf0a0 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -33,6 +33,7 @@ import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
@@ -55,8 +56,9 @@ public class JobRebalancer extends TaskRebalancer {
private static final String PREV_RA_NODE = "PreviousResourceAssignment";
@Override
- public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
- IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
+ public ResourceAssignment computeBestPossiblePartitionState(
+ ClusterDataCache clusterData, IdealState taskIs, Resource resource,
+ CurrentStateOutput currStateOutput) {
final String jobName = resource.getResourceName();
LOG.debug("Computer Best Partition for job: " + jobName);
@@ -258,6 +260,7 @@ public class JobRebalancer extends TaskRebalancer {
|| (jobCfg.getTargetResource() != null
&& cache.getIdealState(jobCfg.getTargetResource()) != null
&& !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
+
if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
return buildEmptyAssignment(jobResource, currStateOutput);
@@ -340,8 +343,8 @@ public class JobRebalancer extends TaskRebalancer {
Message pendingMessage = currentStateOutput.getPendingMessage(jobResource, partition,
instance);
// If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
- if (state == TaskPartitionState.RUNNING
- || (state == TaskPartitionState.INIT && pendingMessage != null)) {
+ if (state == TaskPartitionState.RUNNING || (state == TaskPartitionState.INIT
+ && pendingMessage != null)) {
return false;
}
}
@@ -351,6 +354,7 @@ public class JobRebalancer extends TaskRebalancer {
/**
* Get the last task assignment for a given job
* @param resourceName the name of the job
+ *
* @return {@link ResourceAssignment} instance, or null if no assignment is available
*/
private ResourceAssignment getPrevResourceAssignment(String resourceName) {
@@ -395,8 +399,9 @@ public class JobRebalancer extends TaskRebalancer {
/**
* @param liveInstances
- * @param prevAssignment task partition -> (instance -> state)
+ * @param prevAssignment task partition -> (instance -> state)
* @param allTaskPartitions all task partitionIds
+ *
* @return instance -> partitionIds from previous assignment, if the instance is still live
*/
private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
@@ -438,4 +443,4 @@ public class JobRebalancer extends TaskRebalancer {
}
return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index dae0da6..2f88e24 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -47,26 +47,25 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
implements Rebalancer, MappingCalculator {
private static final Logger LOG = LoggerFactory.getLogger(TaskRebalancer.class);
- @Override
- public void init(HelixManager manager) {
+ @Override public void init(HelixManager manager) {
_manager = manager;
}
- @Override
- public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
+ @Override public abstract ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput);
/**
* Checks if the workflow has been stopped.
+ *
* @param ctx Workflow context containing task states
* @param cfg Workflow config containing set of tasks
+ *
* @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
*/
protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
for (String job : cfg.getJobDag().getAllNodes()) {
TaskState jobState = ctx.getJobState(job);
- if (jobState != null
- && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) {
+ if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || jobState.equals(TaskState.STOPPING))) {
return false;
}
}
@@ -90,9 +89,11 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
/**
* Check all the dependencies of a job to determine whether the job is ready to be scheduled.
+ *
* @param job
* @param workflowCfg
* @param workflowCtx
+ *
* @return
*/
protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg,
@@ -116,8 +117,8 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
// If there is any parent job not started, this job should not be scheduled
if (notStartedCount > 0) {
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Job %s is not ready to start, notStartedParent(s)=%d.", job,
- notStartedCount));
+ LOG.debug(String
+ .format("Job %s is not ready to start, notStartedParent(s)=%d.", job, notStartedCount));
}
return false;
}
@@ -132,8 +133,8 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache);
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Job %s is not ready to start, failedCount(s)=%d.", job,
- failedOrTimeoutCount));
+ LOG.debug(String
+ .format("Job %s is not ready to start, failedCount(s)=%d.", job, failedOrTimeoutCount));
}
return false;
}
@@ -164,7 +165,9 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
/**
* Check if a workflow is ready to schedule.
+ *
* @param workflowCfg the workflow to check
+ *
* @return true if the workflow is ready for schedule, false if not ready
*/
protected boolean isWorkflowReadyForSchedule(WorkflowConfig workflowCfg) {
@@ -173,11 +176,10 @@ public abstract class TaskRebalancer extends AbstractTaskDispatcher
return (startTime == null || startTime.getTime() <= System.currentTimeMillis());
}
- @Override
- public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
+ @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
// All of the heavy lifting is in the ResourceAssignment computation,
// so this part can just be a no-op.
return currentIdealState;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/1b4e0bbb/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index ded3aa2..1ce448c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -838,7 +838,9 @@ public class TaskUtil {
/**
* Check whether tasks are just started or still running
+ *
* @param jobContext The job context
+ *
* @return False if still tasks not in final state. Otherwise return true
*/
public static boolean checkJobStopped(JobContext jobContext) {
@@ -851,8 +853,10 @@ public class TaskUtil {
return true;
}
+
/**
* Count the number of jobs in a workflow that are not in final state.
+ *
* @param workflowCfg
* @param workflowCtx
* @return