You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/11/25 21:19:28 UTC
[helix] branch master updated: Fix targeted job quota calculation
for given up tasks (#1548)
This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new d5941ca Fix targeted job quota calculation for given up tasks (#1548)
d5941ca is described below
commit d5941caea82a614113c93dc50ae01965e20aa90e
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Wed Nov 25 13:19:19 2020 -0800
Fix targeted job quota calculation for given up tasks (#1548)
In this commit, the tasks that should not be retried again, will not occupy
any quota. The would avoid the jobs being blocked because of quota
usage of given up tasks.
---
.../apache/helix/task/AbstractTaskDispatcher.java | 13 +++++-----
.../org/apache/helix/task/WorkflowDispatcher.java | 2 +-
...skQuota.java => TestTaskQuotaCalculations.java} | 30 +++++++++++++++++++++-
3 files changed, 37 insertions(+), 8 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 8f112e9..76359fe 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -569,7 +569,8 @@ public abstract class AbstractTaskDispatcher {
excludeSet.addAll(partitionsWithDelay);
// The following is filtering of tasks before passing them to the assigner
- // Only feed in tasks that need to be assigned (null and STOPPED)
+ // Only feed in tasks that need to be assigned (have state equal to null, STOPPED, TIMED_OUT,
+ // TASK_ERROR, or DROPPED) or their assigned participant is not live anymore
Set<Integer> filteredTaskPartitionNumbers = filterTasks(allPartitions, jobCtx, liveInstances);
// Remove all excludeSet tasks to be safer because some STOPPED tasks have been already
// re-started (excludeSet includes already-assigned partitions). Also tasks with their retry
@@ -587,13 +588,13 @@ public abstract class AbstractTaskDispatcher {
for (int partitionNum : allPartitions) {
TaskPartitionState taskPartitionState = jobCtx.getPartitionState(partitionNum);
if (isTaskNotInTerminalState(taskPartitionState)
- && !partitionsWithDelay.contains(partitionNum)) {
+ && !partitionsWithDelay.contains(partitionNum)
+ && !isTaskGivenup(jobCtx, jobCfg, partitionNum)) {
// Some targeted tasks may have timed-out due to Participants (instances) not being
// live, so we give tasks like these another try
// If some of these tasks are already scheduled and running, they will be dropped as
- // well
- // Also, do not include partitions with delay that are not ready to be assigned and
- // scheduled
+ // well. Also, do not include partitions with delay that are not ready to be assigned and
+ // scheduled and the partitions that cannot be retried (given up)
partitionsToRetryOnLiveInstanceChangeForTargetedJob.add(partitionNum);
}
}
@@ -838,7 +839,7 @@ public abstract class AbstractTaskDispatcher {
*/
private boolean isTaskNotInTerminalState(TaskPartitionState state) {
return state != TaskPartitionState.COMPLETED && state != TaskPartitionState.TASK_ABORTED
- && state != TaskPartitionState.DROPPED && state != TaskPartitionState.ERROR;
+ && state != TaskPartitionState.ERROR;
}
protected static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index fd482bb..b8f0e67 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -317,7 +317,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
updateBestPossibleStateOutput(job, resourceAssignment, bestPossibleOutput);
} catch (Exception e) {
LogUtil.logWarn(LOG, _clusterDataCache.getClusterEventId(),
- String.format("Failed to compute job assignment for job %s", job, e));
+ String.format("Failed to compute job assignment for job %s", job), e);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskQuotaCalculations.java
similarity index 86%
rename from helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
rename to helix-core/src/test/java/org/apache/helix/integration/task/TestTaskQuotaCalculations.java
index b8da096..9cb24e5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStuckTaskQuota.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskQuotaCalculations.java
@@ -27,6 +27,7 @@ import org.apache.helix.TestHelper;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskPartitionState;
@@ -42,12 +43,13 @@ import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
-public class TestStuckTaskQuota extends TaskTestBase {
+public class TestTaskQuotaCalculations extends TaskTestBase {
private CountDownLatch latch = new CountDownLatch(1);
@BeforeClass
public void beforeClass() throws Exception {
_numNodes = 2;
+ _numPartitions = 100;
super.beforeClass();
// Stop participants that have been started in super class
@@ -153,6 +155,32 @@ public class TestStuckTaskQuota extends TaskTestBase {
_driver.stop(workflowName3);
}
+ @Test(dependsOnMethods = "testStuckTaskQuota")
+ public void testTaskErrorMaxRetriesQuotaRelease() throws Exception {
+ for (int i = 0; i < _numNodes; i++) {
+ super.stopParticipant(i);
+ Assert.assertFalse(_participants[i].isConnected());
+ }
+ _participants = new MockParticipantManager[_numNodes];
+
+ // Start only one participant
+ startParticipantAndRegisterNewMockTask(0);
+
+ String jobQueueName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+ JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+ .setMaxAttemptsPerTask(2).setWorkflow(jobQueueName).setFailureThreshold(100000)
+ .setJobCommandConfigMap(
+ ImmutableMap.of(MockTask.JOB_DELAY, "10", MockTask.FAILURE_COUNT_BEFORE_SUCCESS, "10"));
+
+ JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+ jobQueue.enqueueJob(jobName, jobBuilder);
+
+ _driver.start(jobQueue.build());
+ _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, jobName),
+ TaskState.COMPLETED);
+ }
+
private void startParticipantAndRegisterNewMockTask(int participantIndex) {
Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
taskFactoryReg.put(NewMockTask.TASK_COMMAND, NewMockTask::new);