You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by al...@apache.org on 2020/08/29 15:01:20 UTC
[helix] branch master updated: Continue task pipeline if
currentState is null (#1329)
This is an automated email from the ASF dual-hosted git repository.
alizamus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new ef520ac Continue task pipeline if currentState is null (#1329)
ef520ac is described below
commit ef520ac0b93296db4401bcf22519954ba51a2ea8
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Sat Aug 29 08:01:09 2020 -0700
Continue task pipeline if currentState is null (#1329)
In this commit, if controller encounters null currentStates, it will
continue the pipeline and TaskSchedulingStage. Also such
currentState would not count toward the quota.
---
.../helix/task/AssignableInstanceManager.java | 10 +-
.../integration/task/TestTaskCurrentStateNull.java | 120 +++++++++++++++++++++
2 files changed, 127 insertions(+), 3 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index d9092ec..88a12e9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -238,8 +238,12 @@ public class AssignableInstanceManager {
String taskState = instanceCurrentStateEntry.getValue();
// If a task in in INIT or RUNNING state on the instance, this task should occupy one
// quota from this instance.
- if (taskState.equals(TaskPartitionState.INIT.name())
- || taskState.equals(TaskPartitionState.RUNNING.name())) {
+ if (taskState == null) {
+ LOG.warn("CurrentState is null for job {}, task {} on instance {}", resourceName,
+ taskId, assignedInstance);
+ }
+ if (TaskPartitionState.INIT.name().equals(taskState)
+ || TaskPartitionState.RUNNING.name().equals(taskState)) {
assignTaskToInstance(assignedInstance, jobConfig, taskId, quotaType);
}
}
@@ -256,7 +260,7 @@ public class AssignableInstanceManager {
String messageToState = instancePendingMessageEntry.getValue().getToState();
// If there is a pending message on the instance which has ToState of RUNNING, the task
// will run on the instance soon. So the task needs to occupy one quota on this instance.
- if (messageToState.equals(TaskPartitionState.RUNNING.name())
+ if (TaskPartitionState.RUNNING.name().equals(messageToState)
&& !TaskPartitionState.INIT.name().equals(
currentStateOutput.getCurrentState(resourceName, partition, assignedInstance))
&& !TaskPartitionState.RUNNING.name().equals(currentStateOutput
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java
new file mode 100644
index 0000000..720e7a3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskCurrentStateNull.java
@@ -0,0 +1,120 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * This test makes sure that controller will not be blocked if there exists null current states.
+ */
+public class TestTaskCurrentStateNull extends TaskTestBase {
+ protected HelixDataAccessor _accessor;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _numPartitions = 1;
+ _numNodes = 1;
+ super.beforeClass();
+ }
+
+ @AfterClass()
+ public void afterClass() throws Exception {
+ super.afterClass();
+ }
+
+ @Test
+ public void testCurrentStateNull() throws Exception {
+ String workflowName1 = TestHelper.getTestMethodName() + "_1";
+ String workflowName2 = TestHelper.getTestMethodName() + "_2";
+
+ Workflow.Builder builder1 = new Workflow.Builder(workflowName1);
+ Workflow.Builder builder2 = new Workflow.Builder(workflowName2);
+
+ JobConfig.Builder jobBuilder1 = new JobConfig.Builder().setWorkflow(workflowName1)
+ .setNumberOfTasks(5).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"));
+
+ JobConfig.Builder jobBuilder2 = new JobConfig.Builder().setWorkflow(workflowName2)
+ .setNumberOfTasks(1).setNumConcurrentTasksPerInstance(100).setCommand(MockTask.TASK_COMMAND)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10000"));
+
+ builder1.addJob("JOB0", jobBuilder1);
+ builder2.addJob("JOB0", jobBuilder2);
+
+ _driver.start(builder1.build());
+ _driver.start(builder2.build());
+
+ String namespacedJobName1 = TaskUtil.getNamespacedJobName(workflowName1, "JOB0");
+ String namespacedJobName2 = TaskUtil.getNamespacedJobName(workflowName2, "JOB0");
+
+ _driver.pollForJobState(workflowName1, namespacedJobName1, TaskState.IN_PROGRESS);
+ _driver.pollForJobState(workflowName2, namespacedJobName2, TaskState.IN_PROGRESS);
+
+ // Get the current states of Participant0
+ String instanceP0 = PARTICIPANT_PREFIX + "_" + (_startPort + 0);
+ ZkClient clientP0 = (ZkClient) _participants[0].getZkClient();
+ String sessionIdP0 = ZkTestHelper.getSessionId(clientP0);
+ String jobCurrentStatePath1 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
+ + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName1;
+ String jobCurrentStatePath2 = "/" + CLUSTER_NAME + "/INSTANCES/" + instanceP0
+ + "/CURRENTSTATES/" + sessionIdP0 + "/" + namespacedJobName2;
+
+ // Read the current states of Participant0 and make sure they have been created
+ boolean isCurrentStateCreated = TestHelper.verify(() -> {
+ ZNRecord recordJob1 = _manager.getHelixDataAccessor().getBaseDataAccessor()
+ .get(jobCurrentStatePath1, new Stat(), AccessOption.PERSISTENT);
+ ZNRecord recordJob2 = _manager.getHelixDataAccessor().getBaseDataAccessor()
+ .get(jobCurrentStatePath2, new Stat(), AccessOption.PERSISTENT);
+ Map<String, String> taskCurrentState = null;
+ if (recordJob2 != null){
+ taskCurrentState = recordJob2.getMapField(namespacedJobName2 + "_0");
+ }
+ return (recordJob1 != null && recordJob2 != null && taskCurrentState != null);
+ }, TestHelper.WAIT_DURATION);
+ Assert.assertTrue(isCurrentStateCreated);
+
+ ZNRecord recordTask = _manager.getHelixDataAccessor().getBaseDataAccessor()
+ .get(jobCurrentStatePath2, new Stat(), AccessOption.PERSISTENT);
+ Map<String, String> taskCurrentState = recordTask.getMapField(namespacedJobName2 + "_0");
+ taskCurrentState.put(CurrentState.CurrentStateProperty.CURRENT_STATE.name(), null);
+ recordTask.setMapField(namespacedJobName2 + "_0", taskCurrentState);
+
+ _manager.getHelixDataAccessor().getBaseDataAccessor().set(jobCurrentStatePath2, recordTask,
+ AccessOption.PERSISTENT);
+ _driver.pollForWorkflowState(workflowName1, TestHelper.WAIT_DURATION, TaskState.COMPLETED);
+ }
+}