You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/06/22 22:57:33 UTC
[14/50] [abbrv] helix git commit: Support cancel tasks with
synchronized check task status
Support cancel tasks with synchronized check task status
Currently, in Helix, cancel and stop a job does not check subtasks status. In this rb:
1. Add new API to support sync stopping a workflow/queue
2. Controller side check subtasks are stopped before mark job status.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/470b514f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/470b514f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/470b514f
Branch: refs/heads/master
Commit: 470b514f3cc1f6d879938112bc862ab2ba22378d
Parents: 42273bb
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Feb 9 14:06:57 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Feb 9 14:06:57 2017 -0800
----------------------------------------------------------------------
.../org/apache/helix/task/JobRebalancer.java | 49 ++++++++++++++++----
.../java/org/apache/helix/task/TaskDriver.java | 36 +++++++++++++-
.../org/apache/helix/task/TaskRebalancer.java | 5 +-
.../java/org/apache/helix/task/TaskState.java | 5 +-
.../apache/helix/integration/task/MockTask.java | 7 ++-
.../task/TestTaskRebalancerStopResume.java | 42 +++++++++++++++++
6 files changed, 128 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index bd7e819..bed81cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -19,15 +19,6 @@ package org.apache.helix.task;
* under the License.
*/
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import org.apache.helix.*;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.*;
-import org.apache.log4j.Logger;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -40,6 +31,23 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
/**
* Custom rebalancer implementation for the {@code Job} in task model.
*/
@@ -191,10 +199,16 @@ public class JobRebalancer extends TaskRebalancer {
TargetState jobTgtState = workflowConfig.getTargetState();
// Update running status in workflow context
if (jobTgtState == TargetState.STOP) {
- workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+ if (checkJobStopped(jobCtx)) {
+ workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+ } else {
+ workflowCtx.setJobState(jobResource, TaskState.STOPPING);
+ }
// Workflow has been stopped if all in progress jobs are stopped
if (isWorkflowStopped(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
+ } else {
+ workflowCtx.setWorkflowState(TaskState.STOPPING);
}
} else {
workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
@@ -657,6 +671,21 @@ public class JobRebalancer extends TaskRebalancer {
}
/**
+ * Check whether tasks are not in final states
+ * @param jobContext The job context
+ * @return False if still tasks not in final state. Otherwise return true
+ */
+ private boolean checkJobStopped(JobContext jobContext) {
+ for (int partition : jobContext.getPartitionSet()) {
+ TaskPartitionState taskState = jobContext.getPartitionState(partition);
+ if (taskState != null && taskState.equals(TaskPartitionState.RUNNING)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* Computes the partition name given the resource name and partition id.
*/
private String pName(String resource, int pId) {
http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 5e39e17..c922b18 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -692,12 +692,44 @@ public class TaskDriver {
}
/**
- * Public method to stop a workflow/queue.
+ * Public async method to stop a workflow/queue.
+ *
+ * This call only send STOP command to Helix, it does not check
+ * whether the workflow (all jobs) has been stopped yet.
*
* @param workflow
*/
- public void stop(String workflow) {
+ public void stop(String workflow) throws InterruptedException {
+ setWorkflowTargetState(workflow, TargetState.STOP);
+ }
+
+ /**
+ * Public sync method to stop a workflow/queue with timeout
+ *
+ * Basically the workflow and all of its jobs has been stopped if this method return success.
+ *
+ * @param workflow The workflow name
+ * @param timeout The timeout for stopping workflow/queue in milisecond
+ */
+ public void waitToStop(String workflow, long timeout) throws InterruptedException {
setWorkflowTargetState(workflow, TargetState.STOP);
+ long endTime = System.currentTimeMillis() + timeout;
+
+ while (System.currentTimeMillis() <= endTime) {
+ WorkflowContext workflowContext = getWorkflowContext(workflow);
+
+ if (workflowContext == null || !workflowContext.getWorkflowState()
+ .equals(TaskState.STOPPED)) {
+ Thread.sleep(1000);
+ } else {
+ // Successfully stopped
+ return;
+ }
+ }
+
+ // Failed to stop with timeout
+ throw new HelixException(String
+ .format("Fail to stop the workflow/queue %s with in %d milliseconds.", workflow, timeout));
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index d4ac1b8..27741ca 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -114,9 +114,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
protected boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
for (String job : cfg.getJobDag().getAllNodes()) {
TaskState jobState = ctx.getJobState(job);
- if (jobState != null && jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
- && jobState != TaskState.STOPPED)
+ if (jobState != null && (jobState.equals(TaskState.IN_PROGRESS) || jobState
+ .equals(TaskState.STOPPING))) {
return false;
+ }
}
return true;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 1000a9b..4e12f2d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -27,7 +27,6 @@ public enum TaskState {
* The task has yet to start
*/
NOT_STARTED,
-
/**
* The task is in progress.
*/
@@ -37,6 +36,10 @@ public enum TaskState {
*/
STOPPED,
/**
+ * The task is in stopping process. Will complete if subtasks are stopped or completed
+ */
+ STOPPING,
+ /**
* The task has failed. It cannot be resumed.
*/
FAILED,
http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index 948e8f3..0502f8e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -36,7 +36,9 @@ public class MockTask extends UserContentStore implements Task {
public static final String ERROR_MESSAGE = "ErrorMessage";
public static final String FAILURE_COUNT_BEFORE_SUCCESS = "FailureCountBeforeSuccess";
public static final String SUCCESS_COUNT_BEFORE_FAIL = "SuccessCountBeforeFail";
+ public static final String NOT_ALLOW_TO_CANCEL = "NotAllowToCancel";
private final long _delay;
+ private volatile boolean _notAllowToCancel;
private volatile boolean _canceled;
private TaskResult.Status _taskResultStatus;
private boolean _throwException;
@@ -57,6 +59,9 @@ public class MockTask extends UserContentStore implements Task {
}
_delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 100L;
+ _notAllowToCancel = cfg.containsKey(NOT_ALLOW_TO_CANCEL)
+ ? Boolean.parseBoolean(cfg.get(NOT_ALLOW_TO_CANCEL))
+ : false;
_taskResultStatus = cfg.containsKey(TASK_RESULT_STATUS) ?
TaskResult.Status.valueOf(cfg.get(TASK_RESULT_STATUS)) :
TaskResult.Status.COMPLETED;
@@ -77,7 +82,7 @@ public class MockTask extends UserContentStore implements Task {
long expiry = System.currentTimeMillis() + _delay;
long timeLeft;
while (System.currentTimeMillis() < expiry) {
- if (_canceled) {
+ if (_canceled && !_notAllowToCancel) {
timeLeft = expiry - System.currentTimeMillis();
return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
: timeLeft));
http://git-wip-us.apache.org/repos/asf/helix/blob/470b514f/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index bc02148..2b0d38c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration.task;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,8 +37,10 @@ import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.ScheduleConfig;
+import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
@@ -46,6 +49,7 @@ import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -479,6 +483,44 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
System.out.println("END " + queueName + " at " + new Date(System.currentTimeMillis()));
}
+ @Test
+ public void testStopWorkflowInStoppingState() throws InterruptedException {
+ final String workflowName = TestHelper.getTestMethodName();
+
+ // Create a workflow
+ Workflow.Builder builder = new Workflow.Builder(workflowName);
+
+ // Add 2 jobs
+ Map<String, String> jobCommandConfigMap = new HashMap<String, String>();
+ jobCommandConfigMap.put(MockTask.TIMEOUT_CONFIG, "1000000");
+ jobCommandConfigMap.put(MockTask.NOT_ALLOW_TO_CANCEL, String.valueOf(true));
+ List<TaskConfig> taskConfigs = ImmutableList
+ .of(new TaskConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTaskId("testTask")
+ .build());
+ JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+ .addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandConfigMap);
+ String job1Name = "Job1";
+
+ JobConfig.Builder job2 =
+ new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).addTaskConfigs(taskConfigs);
+ String job2Name = "Job2";
+
+ builder.addJob(job1Name, job1);
+ builder.addJob(job2Name, job2);
+
+ _driver.start(builder.build());
+ Thread.sleep(2000);
+ _driver.stop(workflowName);
+ _driver.pollForWorkflowState(workflowName, TaskState.STOPPING);
+
+ // Expect job and workflow stuck in STOPPING state.
+ WorkflowContext workflowContext = _driver.getWorkflowContext(workflowName);
+ Assert.assertEquals(
+ workflowContext.getJobState(TaskUtil.getNamespacedJobName(workflowName, job1Name)),
+ TaskState.STOPPING);
+ }
+
private void verifyJobDeleted(String queueName, String jobName) throws Exception {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();