You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/13 00:04:37 UTC

[3/6] helix git commit: Helix job should fail immediately if the target resource is disabled

Helix job should fail immediately if the target resource is disabled

For targeted jobs, once the resource has been disabled, Helix should not keep scheduling the jobs. We should fail it out. If the job is already in progress, we can mark it as failed but in running task will not actually got cancelled.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/68fe74e6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/68fe74e6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/68fe74e6

Branch: refs/heads/master
Commit: 68fe74e696c92d332627935148f645f64b42d389
Parents: eac4940
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu May 31 17:55:36 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Jul 12 16:53:24 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/JobRebalancer.java    |  6 ++-
 .../TestFailTargetJobWhenResourceDisabled.java  | 53 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/68fe74e6/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 0f09166..11abb25 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -258,8 +258,10 @@ public class JobRebalancer extends TaskRebalancer {
 
     addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
 
-    if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg
-        .getFailureThreshold()) {
+    if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
+        || (jobCfg.getTargetResource() != null
+        && cache.getIdealState(jobCfg.getTargetResource()) != null && !cache
+        .getIdealState(jobCfg.getTargetResource()).isEnabled())) {
       if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
         failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap());
         return buildEmptyAssignment(jobResource, currStateOutput);

http://git-wip-us.apache.org/repos/asf/helix/blob/68fe74e6/helix-core/src/test/java/org/apache/helix/integration/task/TestFailTargetJobWhenResourceDisabled.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestFailTargetJobWhenResourceDisabled.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestFailTargetJobWhenResourceDisabled.java
new file mode 100644
index 0000000..676c053
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestFailTargetJobWhenResourceDisabled.java
@@ -0,0 +1,53 @@
+package org.apache.helix.integration.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestFailTargetJobWhenResourceDisabled extends TaskTestBase {
+  private JobConfig.Builder _jobCfg;
+  private String _jobName;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+    _jobName = "TestJob";
+    _jobCfg = new JobConfig.Builder().setJobId(_jobName).setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB);
+  }
+
+  @Test
+  public void testJobScheduleAfterResourceDisabled() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    _gSetupTool.getClusterManagementTool()
+        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false);
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    workflow.addJob(_jobName, _jobCfg);
+    _driver.start(workflow.build());
+
+    _driver.pollForWorkflowState(workflowName, TaskState.FAILED);
+  }
+
+  @Test
+  public void testJobScheduleBeforeResourceDisabled() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflow = new Workflow.Builder(workflowName);
+    _jobCfg.setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000000"));
+    workflow.addJob(_jobName, _jobCfg);
+    _driver.start(workflow.build());
+    Thread.sleep(1000);
+    _gSetupTool.getClusterManagementTool()
+        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false);
+    _driver.pollForWorkflowState(workflowName, TaskState.FAILED);
+  }
+
+
+}