You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/10/06 20:23:20 UTC
[helix] branch master updated: Fix currentState not being removed
when pending message exists (#1422)
This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 563a6c1 Fix currentState not being removed when pending message exists (#1422)
563a6c1 is described below
commit 563a6c1356ea62905f3318fbd145fc5d19419499
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Tue Oct 6 13:23:10 2020 -0700
Fix currentState not being removed when pending message exists (#1422)
This commit makes sure that the controller drops the current state of
the jobs that have just finished and have pending messages.
---
.../apache/helix/task/AbstractTaskDispatcher.java | 36 +++++++------
.../task/TestDropCurrentStateRunningTask.java | 62 ++++++++++++++++++++++
2 files changed, 82 insertions(+), 16 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 6e86f52..35c4c39 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -122,30 +122,37 @@ public abstract class AbstractTaskDispatcher {
Set<Integer> donePartitions = new TreeSet<>();
for (int pId : pSet) {
final String pName = pName(jobResource, pId);
- TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
+ TaskPartitionState currState = getTaskCurrentState(currStateOutput,
jobResource, pId, pName, instance, jobCtx, jobTgtState);
- if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
- LOG.warn(
- "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
- instance, pId);
- continue;
- }
-
// Check for pending state transitions on this (partition, instance). If there is a pending
// state transition, we prioritize this pending state transition and set the assignment from
// this pending state transition, essentially "waiting" until this pending message clears
+ // If there is a pending message, we should not continue to update the context because from
+ // controller prospective, state transition has not been completed yet if pending message
+ // still existed.
+ // If context gets updated here, controller might remove the job from RunTimeJobDAG which
+ // can cause the task's CurrentState not being removed when there is a pending message for
+ // that task.
Message pendingMessage =
currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
- if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name())) {
- // If there is a pending message whose destination state is different from the current
- // state, just make the same assignment as the pending message. This is essentially
- // "waiting" until this state transition is complete
+ if (pendingMessage != null) {
processTaskWithPendingMessage(pId, pName, instance, pendingMessage, jobState, currState,
paMap, assignedPartitions);
continue;
}
+ // Update job context based on current state
+ updatePartitionInformationInJobContext(currStateOutput, jobResource, currState, jobCtx,
+ pId, pName, instance);
+
+ if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
+ LOG.warn(
+ "Instance {} does not match the assigned participant for pId {} in the job context. Skipping task scheduling.",
+ instance, pId);
+ continue;
+ }
+
// Get AssignableInstance for this instance and TaskConfig for releasing resources
String quotaType = jobCfg.getJobType();
String taskId;
@@ -365,7 +372,7 @@ public abstract class AbstractTaskDispatcher {
}
}
- private TaskPartitionState updateJobContextAndGetTaskCurrentState(
+ private TaskPartitionState getTaskCurrentState(
CurrentStateOutput currentStateOutput, String jobResource, Integer pId, String pName,
String instance, JobContext jobCtx, TargetState jobTgtState) {
String currentStateString =
@@ -389,9 +396,6 @@ public abstract class AbstractTaskDispatcher {
return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
}
TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
- // Update job context based on current state
- updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState, jobCtx,
- pId, pName, instance);
return currentState;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
index c29cc45..f3e3dcf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDropCurrentStateRunningTask.java
@@ -28,6 +28,7 @@ import org.apache.helix.ZkTestHelper;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.CurrentState;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
@@ -47,6 +48,7 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase {
@BeforeClass
public void beforeClass() throws Exception {
_numNodes = 3;
+ _numPartitions = 1;
super.beforeClass();
// Stop participants that have been started in super class
@@ -146,5 +148,65 @@ public class TestDropCurrentStateRunningTask extends TaskTestBase {
&& _manager.getHelixDataAccessor().getBaseDataAccessor()
.get(currentStatePathP1, new Stat(), AccessOption.PERSISTENT) == null),
TestHelper.WAIT_DURATION));
+
+ _driver.waitToStop(workflowName, TestHelper.WAIT_DURATION);
+ }
+
+ @Test(dependsOnMethods = "testDropCurrentStateRunningTask")
+ public void testJobCurrentStateDroppedAfterCompletion() throws Exception {
+ // Stop participants that have been started in previous test and start one of them
+ for (int i = 0; i < _numNodes; i++) {
+ super.stopParticipant(i);
+ Assert.assertFalse(_participants[i].isConnected());
+ }
+ _participants = new MockParticipantManager[_numNodes];
+ startParticipant(0);
+
+ String jobQueueName = TestHelper.getTestMethodName();
+
+ JobConfig.Builder jobBuilderCompleted =
+ JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG).setMaxAttemptsPerTask(1)
+ .setWorkflow(jobQueueName)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10"));
+
+ // Task gets timed out in 10 seconds because the the default value is 10 seconds in
+ // DEFAULT_JOB_CONFIG
+ JobConfig.Builder jobBuilderTimedOut =
+ JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG).setMaxAttemptsPerTask(1)
+ .setWorkflow(jobQueueName)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+
+ JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName, 0, 100);
+
+ for (int i = 0; i < 20; i++) {
+ jobQueue.enqueueJob("job" + i, jobBuilderCompleted);
+ }
+
+ jobQueue.enqueueJob("job" + 20, jobBuilderTimedOut);
+
+ _driver.start(jobQueue.build());
+
+ for (int i = 0; i < 20; i++) {
+ _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job" + i),
+ TaskState.COMPLETED);
+ }
+ _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job" + 20),
+ TaskState.FAILED);
+
+ String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+ ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
+ String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+
+ for (int i = 0; i < 21; i++) {
+ String currentStatePathP0 =
+ "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0 + "/CURRENTSTATES/" + sessionIdP0 + "/"
+ + TaskUtil.getNamespacedJobName(jobQueueName, "job" + i);
+ boolean isCurrentStateRemoved = TestHelper.verify(() -> {
+ ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor()
+ .get(currentStatePathP0, new Stat(), AccessOption.PERSISTENT);
+ return record == null;
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isCurrentStateRemoved);
+ }
}
}