You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2021/09/17 19:56:29 UTC
[helix] branch master updated: Fix adding a task to a job after
deleting old tasks (#1875)
This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 6e07a02 Fix adding a task to a job after deleting old tasks (#1875)
6e07a02 is described below
commit 6e07a024796ebc1a86c10fa10c7bb799a716b393
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Fri Sep 17 12:56:20 2021 -0700
Fix adding a task to a job after deleting old tasks (#1875)
In this commit, the issue of dynamically adding a task to a job
in which some of its tasks have been deleted before is being
addressed.
---
.../task/GenericTaskAssignmentCalculator.java | 11 +--
.../helix/task/TaskAssignmentCalculator.java | 36 ++++++++
.../ThreadCountBasedTaskAssignmentCalculator.java | 12 +--
.../helix/integration/task/TestAddDeleteTask.java | 100 +++++++++++++++++++++
4 files changed, 139 insertions(+), 20 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index 987f2a4..0f2ad13 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -53,16 +53,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Map<String, IdealState> idealStateMap) {
- Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
- Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
- for (TaskConfig taskCfg : taskMap.values()) {
- String taskId = taskCfg.getId();
- int nextPartition = jobCtx.getPartitionSet().size();
- if (!taskIdMap.containsKey(taskId)) {
- jobCtx.setTaskIdForPartition(nextPartition, taskId);
- }
- }
- return jobCtx.getPartitionSet();
+ return getAllTaskPartitionsDefault(jobCfg, jobCtx);
}
@Override
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index ac87af2..e7491b7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
*/
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -121,4 +122,39 @@ public abstract class TaskAssignmentCalculator {
}
return deletedPartitions;
}
+
+ /**
+ * Get all the partitions/tasks that belong to the non-targeted job.
+ * @param jobCfg the task configuration
+ * @param jobCtx the task context
+ * @return set of partition numbers
+ */
+ public Set<Integer> getAllTaskPartitionsDefault(JobConfig jobCfg, JobContext jobCtx) {
+ Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
+ Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
+ // Check if a gap exists in the context due to previouslys
+ // removed tasks. If yes, the missing pIDs should be considered
+ // for newly added tasks
+ Set<Integer> existingPartitions = jobCtx.getPartitionSet();
+ Set<Integer> missingPartitions = new HashSet<>();
+ if (existingPartitions.size() != 0) {
+ for (int pId = 0; pId < Collections.max(existingPartitions); pId++) {
+ if (!existingPartitions.contains(pId)) {
+ missingPartitions.add(pId);
+ }
+ }
+ }
+ for (TaskConfig taskCfg : taskMap.values()) {
+ String taskId = taskCfg.getId();
+ if (!taskIdMap.containsKey(taskId)) {
+ int nextPartition = jobCtx.getPartitionSet().size();
+ if (missingPartitions.size()!=0) {
+ nextPartition = missingPartitions.iterator().next();
+ missingPartitions.remove(nextPartition);
+ }
+ jobCtx.setTaskIdForPartition(nextPartition, taskId);
+ }
+ }
+ return jobCtx.getPartitionSet();
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
index b3d85e1..f8a4106 100644
--- a/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/ThreadCountBasedTaskAssignmentCalculator.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
*/
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -63,16 +64,7 @@ public class ThreadCountBasedTaskAssignmentCalculator extends TaskAssignmentCalc
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Map<String, IdealState> idealStateMap) {
- Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
- Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
- for (TaskConfig taskCfg : taskMap.values()) {
- String taskId = taskCfg.getId();
- int nextPartition = jobCtx.getPartitionSet().size();
- if (!taskIdMap.containsKey(taskId)) {
- jobCtx.setTaskIdForPartition(nextPartition, taskId);
- }
- }
- return jobCtx.getPartitionSet();
+ return getAllTaskPartitionsDefault(jobCfg, jobCtx);
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java
index d99707f..e432a07 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestAddDeleteTask.java
@@ -610,6 +610,106 @@ public class TestAddDeleteTask extends TaskTestBase {
}
@Test(dependsOnMethods = "testDeleteTaskAndJobCompleted")
+ public void testDeleteMiddleTaskAndAdd() throws Exception {
+ String workflowName = TestHelper.getTestMethodName();
+ String jobName = "JOB0";
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName)
+ .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "20000"));
+
+ Workflow.Builder workflowBuilder1 =
+ new Workflow.Builder(workflowName).addJob(jobName, jobBuilder1);
+ _driver.start(workflowBuilder1.build());
+
+ _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, jobName),
+ TaskState.IN_PROGRESS);
+
+ // Wait until initial task goes to RUNNING state
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ if (jobContext == null) {
+ return false;
+ }
+ TaskPartitionState state = jobContext.getPartitionState(0);
+ if (state == null) {
+ return false;
+ }
+ return (state == TaskPartitionState.RUNNING);
+ }, TestHelper.WAIT_DURATION));
+
+ // Only one task (initial task) should be included in the job
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ return (jobContext.getPartitionSet().size() == 1);
+ }, TestHelper.WAIT_DURATION));
+
+ // Add new tasks
+ Map<String, String> taskConfig1 =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+ Map<String, String> taskConfig2 =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+ Map<String, String> taskConfig3 =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+ Map<String, String> taskConfig4 =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+
+ TaskConfig task1 = new TaskConfig(null, taskConfig1, null, null);
+ TaskConfig task2 = new TaskConfig(null, taskConfig2, null, null);
+ TaskConfig task3 = new TaskConfig(null, taskConfig3, null, null);
+ TaskConfig task4 = new TaskConfig(null, taskConfig4, null, null);
+
+ _driver.addTask(workflowName, jobName, task1);
+ _driver.addTask(workflowName, jobName, task2);
+ _driver.addTask(workflowName, jobName, task3);
+ _driver.addTask(workflowName, jobName, task4);
+
+ // 5 tasks should be included in the job
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ return (jobContext.getPartitionSet().size() == 5);
+ }, TestHelper.WAIT_DURATION));
+
+ // All Task should be in RUNNING
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ int runningTasks = 0;
+ for (Integer pId: jobContext.getPartitionSet()) {
+ if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING) {
+ runningTasks++;
+ }
+ }
+ return (runningTasks == 5);
+ }, TestHelper.WAIT_DURATION));
+
+ _driver.deleteTask(workflowName, jobName, task3.getId());
+
+ // Since one of the tasks had been delete, we should expect 4 tasks in the context
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ return (jobContext.getPartitionSet().size() == 4);
+ }, TestHelper.WAIT_DURATION));
+
+ // Add new tasks and make sure the task is being added to context
+ Map<String, String> taskConfig5 =
+ new HashMap<String, String>(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
+ TaskConfig task5 = new TaskConfig(null, taskConfig5, null, null);
+ _driver.addTask(workflowName, jobName, task5);
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ JobContext jobContext =
+ _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, jobName));
+ return (jobContext.getPartitionSet().size() == 5);
+ }, TestHelper.WAIT_DURATION));
+ _driver.stop(workflowName);
+ }
+
+ @Test(dependsOnMethods = "testDeleteTaskAndJobCompleted")
public void testPartitionDropTargetedJob() throws Exception {
String workflowName = TestHelper.getTestMethodName();
String jobName = "JOB0";