You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/11/25 21:19:28 UTC

[helix] branch master updated: Fix targeted job quota calculation for given up tasks (#1548)

This is an automated email from the ASF dual-hosted git repository.

alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new d5941ca  Fix targeted job quota calculation for given up tasks (#1548)
d5941ca is described below

commit d5941caea82a614113c93dc50ae01965e20aa90e
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Nov 25 13:19:19 2020 -0800

    Fix targeted job quota calculation for given up tasks (#1548)
    
    In this commit, the tasks that should not be retried again, will not occupy
    any quota. The would avoid the jobs being blocked because of quota
    usage of given up tasks.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 13 +++++-----
 .../org/apache/helix/task/WorkflowDispatcher.java  |  2 +-
 ...skQuota.java => TestTaskQuotaCalculations.java} | 30 +++++++++++++++++++++-
 3 files changed, 37 insertions(+), 8 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 8f112e9..76359fe 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -569,7 +569,8 @@ public abstract class AbstractTaskDispatcher {
     excludeSet.addAll(partitionsWithDelay);
 
     // The following is filtering of tasks before passing them to the assigner
-    // Only feed in tasks that need to be assigned (null and STOPPED)
+    // Only feed in tasks that need to be assigned (have state equal to null, STOPPED, TIMED_OUT,
+    // TASK_ERROR, or DROPPED) or their assigned participant is not live anymore
     Set<Integer> filteredTaskPartitionNumbers = filterTasks(allPartitions, jobCtx, liveInstances);
     // Remove all excludeSet tasks to be safer because some STOPPED tasks have been already
     // re-started (excludeSet includes already-assigned partitions). Also tasks with their retry
@@ -587,13 +588,13 @@ public abstract class AbstractTaskDispatcher {
       for (int partitionNum : allPartitions) {
         TaskPartitionState taskPartitionState = jobCtx.getPartitionState(partitionNum);
         if (isTaskNotInTerminalState(taskPartitionState)
-            && !partitionsWithDelay.contains(partitionNum)) {
+            && !partitionsWithDelay.contains(partitionNum)
+            && !isTaskGivenup(jobCtx, jobCfg, partitionNum)) {
           // Some targeted tasks may have timed-out due to Participants (instances) not being
           // live, so we give tasks like these another try
           // If some of these tasks are already scheduled and running, they will be dropped as
-          // well
-          // Also, do not include partitions with delay that are not ready to be assigned and
-          // scheduled
+          // well. Also, do not include partitions with delay that are not ready to be assigned and
+          // scheduled and the partitions that cannot be retried (given up)
           partitionsToRetryOnLiveInstanceChangeForTargetedJob.add(partitionNum);
         }
       }
@@ -838,7 +839,7 @@ public abstract class AbstractTaskDispatcher {
    */
   private boolean isTaskNotInTerminalState(TaskPartitionState state) {
     return state != TaskPartitionState.COMPLETED && state != TaskPartitionState.TASK_ABORTED
-        && state != TaskPartitionState.DROPPED && state != TaskPartitionState.ERROR;
+        && state != TaskPartitionState.ERROR;
   }
 
   protected static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index fd482bb..b8f0e67 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -317,7 +317,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
       updateBestPossibleStateOutput(job, resourceAssignment, bestPossibleOutput);
     } catch (Exception e) {
       LogUtil.logWarn(LOG, _clusterDataCache.getClusterEventId(),
-          String.format("Failed to compute job assignment for job %s", job, e));
+          String.format("Failed to compute job assignment for job %s", job), e);
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskQuotaCalculations.java
similarity index 86%
rename from helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
rename to helix-core/src/test/java/org/apache/helix/integration/task/TestTaskQuotaCalculations.java
index b8da096..9cb24e5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskQuotaCalculations.java
@@ -27,6 +27,7 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskPartitionState;
@@ -42,12 +43,13 @@ import org.testng.annotations.Test;
 import com.google.common.collect.ImmutableMap;
 
 
-public class TestStuckTaskQuota extends TaskTestBase {
+public class TestTaskQuotaCalculations extends TaskTestBase {
   private CountDownLatch latch = new CountDownLatch(1);
 
   @BeforeClass
   public void beforeClass() throws Exception {
     _numNodes = 2;
+    _numPartitions = 100;
     super.beforeClass();
 
     // Stop participants that have been started in super class
@@ -153,6 +155,32 @@ public class TestStuckTaskQuota extends TaskTestBase {
     _driver.stop(workflowName3);
   }
 
+  @Test(dependsOnMethods = "testStuckTaskQuota")
+  public void testTaskErrorMaxRetriesQuotaRelease() throws Exception {
+    for (int i = 0; i < _numNodes; i++) {
+      super.stopParticipant(i);
+      Assert.assertFalse(_participants[i].isConnected());
+    }
+    _participants = new MockParticipantManager[_numNodes];
+
+    // Start only one participant
+    startParticipantAndRegisterNewMockTask(0);
+
+    String jobQueueName = TestHelper.getTestMethodName();
+    String jobName = "JOB0";
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(2).setWorkflow(jobQueueName).setFailureThreshold(100000)
+        .setJobCommandConfigMap(
+            ImmutableMap.of(MockTask.JOB_DELAY, "10", MockTask.FAILURE_COUNT_BEFORE_SUCCESS, "10"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob(jobName, jobBuilder);
+
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, jobName),
+        TaskState.COMPLETED);
+  }
+
   private void startParticipantAndRegisterNewMockTask(int participantIndex) {
     Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
     taskFactoryReg.put(NewMockTask.TASK_COMMAND, NewMockTask::new);