You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/31 20:49:37 UTC
helix git commit: [HELIX-773] add getLastScheduledTaskTimestamp
information in workflow rest api
Repository: helix
Updated Branches:
refs/heads/master 0c251bbf6 -> 566d4f166
[HELIX-773] add getLastScheduledTaskTimestamp information in workflow rest api
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/566d4f16
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/566d4f16
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/566d4f16
Branch: refs/heads/master
Commit: 566d4f166473b477ea0db1cfba5d04c8f3d6bf30
Parents: 0c251bb
Author: Harry Zhang <hr...@linkedin.com>
Authored: Tue Oct 30 16:43:25 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Wed Oct 31 13:48:46 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 22 +++-
.../apache/helix/task/TaskExecutionInfo.java | 65 ++++++++++
.../task/TestGetLastScheduledTaskExecInfo.java | 122 +++++++++++++++++++
.../task/TestGetLastScheduledTaskTimestamp.java | 110 -----------------
.../resources/helix/WorkflowAccessor.java | 5 +-
.../helix/rest/server/TestWorkflowAccessor.java | 9 +-
6 files changed, 213 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index e6256ed..54e3ab3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -85,7 +85,6 @@ public class TaskDriver {
// TODO Implement or configure the limitation in ZK server.
private final static long DEFAULT_CONFIGS_LIMITATION =
HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.TASK_CONFIG_LIMITATION, 100000L);
- private final static long TIMESTAMP_NOT_SET = -1L;
private final static String TASK_START_TIME_KEY = "START_TIME";
protected long _configsLimitation = DEFAULT_CONFIGS_LIMITATION;
@@ -977,14 +976,22 @@ public class TaskDriver {
* -1L if timestamp is not set (either nothing is scheduled or no start time recorded).
*/
public long getLastScheduledTaskTimestamp(String workflowName) {
- long lastScheduledTaskTimestamp = TIMESTAMP_NOT_SET;
+ return getLastScheduledTaskExecutionInfo(workflowName).getStartTimeStamp();
+ }
+
+ public TaskExecutionInfo getLastScheduledTaskExecutionInfo(String workflowName) {
+ long lastScheduledTaskTimestamp = TaskExecutionInfo.TIMESTAMP_NOT_SET;
+ String jobName = null;
+ Integer taskPartitionIndex = null;
+ TaskPartitionState state = null;
+
WorkflowContext workflowContext = getWorkflowContext(workflowName);
if (workflowContext != null) {
Map<String, TaskState> allJobStates = workflowContext.getJobStates();
- for (String job : allJobStates.keySet()) {
- if (!allJobStates.get(job).equals(TaskState.NOT_STARTED)) {
- JobContext jobContext = getJobContext(job);
+ for (Map.Entry<String, TaskState> jobState : allJobStates.entrySet()) {
+ if (!jobState.getValue().equals(TaskState.NOT_STARTED)) {
+ JobContext jobContext = getJobContext(jobState.getKey());
if (jobContext != null) {
Set<Integer> allPartitions = jobContext.getPartitionSet();
for (Integer partition : allPartitions) {
@@ -993,6 +1000,9 @@ public class TaskDriver {
long startTimeLong = Long.parseLong(startTime);
if (startTimeLong > lastScheduledTaskTimestamp) {
lastScheduledTaskTimestamp = startTimeLong;
+ jobName = jobState.getKey();
+ taskPartitionIndex = partition;
+ state = jobContext.getPartitionState(partition);
}
}
}
@@ -1000,7 +1010,7 @@ public class TaskDriver {
}
}
}
- return lastScheduledTaskTimestamp;
+ return new TaskExecutionInfo(jobName, taskPartitionIndex, state, lastScheduledTaskTimestamp);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java b/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java
new file mode 100644
index 0000000..03d66b4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java
@@ -0,0 +1,65 @@
+package org.apache.helix.task;
+
+import java.io.IOException;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TaskExecutionInfo {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ public final static long TIMESTAMP_NOT_SET = -1L;
+ private final String _jobName;
+ private final Integer _taskPartitionIndex;
+ private final TaskPartitionState _taskPartitionState;
+ private final Long _startTimeStamp;
+
+ @JsonCreator
+ public TaskExecutionInfo(
+ @JsonProperty("jobName") String job,
+ @JsonProperty("taskPartitionIndex") Integer index,
+ @JsonProperty("taskPartitionState") TaskPartitionState state,
+ @JsonProperty("startTimeStamp") Long timeStamp) {
+ _jobName = job;
+ _taskPartitionIndex = index;
+ _taskPartitionState = state;
+ _startTimeStamp = timeStamp == null ? TIMESTAMP_NOT_SET : timeStamp;
+ }
+
+ public String getJobName() {
+ return _jobName;
+ }
+
+ public Integer getTaskPartitionIndex() {
+ return _taskPartitionIndex;
+ }
+
+ public TaskPartitionState getTaskPartitionState() {
+ return _taskPartitionState;
+ }
+
+ public Long getStartTimeStamp() {
+ return _startTimeStamp;
+ }
+
+ public String toJson() throws IOException {
+ return OBJECT_MAPPER.writeValueAsString(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof TaskExecutionInfo)) {
+ return false;
+ }
+ TaskExecutionInfo infoObj = (TaskExecutionInfo) obj;
+ return nullOrEquals(getJobName(), infoObj.getJobName()) &&
+ nullOrEquals(getTaskPartitionIndex(), infoObj.getTaskPartitionIndex()) &&
+ nullOrEquals(getTaskPartitionState(), infoObj.getTaskPartitionState()) &&
+ nullOrEquals(getStartTimeStamp(), infoObj.getStartTimeStamp());
+ }
+
+ private boolean nullOrEquals(Object o1, Object o2) {
+ return (o1 == null && o2 == null) || (o1 != null && o2 != null && o1.equals(o2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
new file mode 100644
index 0000000..73fe674
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
@@ -0,0 +1,122 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestGetLastScheduledTaskExecInfo extends TaskTestBase {
+ private final static String TASK_START_TIME_KEY = "START_TIME";
+ private final static long INVALID_TIMESTAMP = -1L;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ setSingleTestEnvironment();
+ super.beforeClass();
+ }
+
+ @Test
+ public void testGetLastScheduledTaskExecInfo() throws InterruptedException {
+ List<Long> startTimesWithStuckTasks = setupTasks("TestWorkflow_2", 5, 99999999);
+
+ // First two must be -1 (two tasks are stuck), and API call must return the last value (most recent timestamp)
+ Assert.assertEquals(startTimesWithStuckTasks.get(0).longValue(), INVALID_TIMESTAMP);
+ Assert.assertEquals(startTimesWithStuckTasks.get(1).longValue(), INVALID_TIMESTAMP);
+ TaskExecutionInfo execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_2");
+ Long lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_2");
+ Assert.assertEquals(startTimesWithStuckTasks.get(3), lastScheduledTaskTs);
+
+ Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_2_job_0");
+ // Workflow 2 will stuck, so its partition state will be RUNNING
+ Assert.assertEquals(execInfo.getTaskPartitionState(), TaskPartitionState.RUNNING);
+ Assert.assertEquals(execInfo.getStartTimeStamp(), lastScheduledTaskTs);
+
+ List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10);
+ // API call needs to return the most recent timestamp (value at last index)
+ lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_3");
+ execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3");
+
+ Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1), lastScheduledTaskTs);
+ Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_3_job_0");
+ Assert.assertEquals(execInfo.getTaskPartitionState(), TaskPartitionState.COMPLETED);
+ Assert.assertEquals(execInfo.getStartTimeStamp(), lastScheduledTaskTs);
+ }
+
+ /**
+ * Helper method for gathering start times for all tasks. Returns start times in ascending order. Null start times
+ * are recorded as 0.
+ *
+ * @param jobQueueName name of the queue
+ * @param numTasks number of tasks to schedule
+ * @param taskTimeout duration of each task to be run for
+ * @return list of timestamps for all tasks in ascending order
+ * @throws InterruptedException
+ */
+ private List<Long> setupTasks(String jobQueueName, int numTasks, long taskTimeout) throws InterruptedException {
+ // Create a queue
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(jobQueueName);
+
+ // Create and enqueue a job
+ JobConfig.Builder jobConfig = new JobConfig.Builder();
+
+ // Create tasks
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ for (int i = 0; i < numTasks; i++) {
+ taskConfigs.add(new TaskConfig.Builder()
+ .setTaskId("task_" + i)
+ .setCommand(MockTask.TASK_COMMAND)
+ .addConfig(MockTask.JOB_DELAY, String.valueOf(taskTimeout))
+ .build());
+ }
+ // Run up to 2 tasks at a time
+ jobConfig.addTaskConfigs(taskConfigs).setNumConcurrentTasksPerInstance(2);
+ queueBuilder.enqueueJob("job_0", jobConfig);
+ _driver.start(queueBuilder.build());
+ // 1 second delay for the Controller
+ Thread.sleep(1000);
+
+ // Pull jobContexts and look at the start times
+ List<Long> startTimes = new ArrayList<>();
+ WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName);
+ for (String job : workflowConfig.getJobDag().getAllNodes()) {
+ JobContext jobContext = _driver.getJobContext(job);
+ Set<Integer> allPartitions = jobContext.getPartitionSet();
+ for (Integer partition : allPartitions) {
+ String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY);
+ if (timestamp == null) {
+ startTimes.add(INVALID_TIMESTAMP);
+ } else {
+ startTimes.add(Long.parseLong(timestamp));
+ }
+ }
+ }
+ Collections.sort(startTimes);
+ return startTimes;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
deleted file mode 100644
index 174d8c6..0000000
--- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import org.apache.helix.integration.task.MockTask;
-import org.apache.helix.integration.task.TaskTestBase;
-import org.apache.helix.integration.task.TaskTestUtil;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-public class TestGetLastScheduledTaskTimestamp extends TaskTestBase {
- private final static String TASK_START_TIME_KEY = "START_TIME";
- private final static long INVALID_TIMESTAMP = -1L;
-
- @BeforeClass
- public void beforeClass() throws Exception {
- setSingleTestEnvironment();
- super.beforeClass();
- }
-
- @Test
- public void testGetLastScheduledTaskTimestamp() throws InterruptedException {
- List<Long> startTimesWithStuckTasks = setupTasks("TestWorkflow_2", 5, 99999999);
- // First two must be -1 (two tasks are stuck), and API call must return the last value (most recent timestamp)
- Assert.assertEquals(startTimesWithStuckTasks.get(0).longValue(), INVALID_TIMESTAMP);
- Assert.assertEquals(startTimesWithStuckTasks.get(1).longValue(), INVALID_TIMESTAMP);
- Assert.assertEquals(startTimesWithStuckTasks.get(3).longValue(),
- _driver.getLastScheduledTaskTimestamp("TestWorkflow_2"));
-
- List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10);
- // API call needs to return the most recent timestamp (value at last index)
- Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1).longValue(),
- _driver.getLastScheduledTaskTimestamp("TestWorkflow_3"));
- }
-
- /**
- * Helper method for gathering start times for all tasks. Returns start times in ascending order. Null start times
- * are recorded as 0.
- *
- * @param jobQueueName name of the queue
- * @param numTasks number of tasks to schedule
- * @param taskTimeout duration of each task to be run for
- * @return list of timestamps for all tasks in ascending order
- * @throws InterruptedException
- */
- private List<Long> setupTasks(String jobQueueName, int numTasks, long taskTimeout) throws InterruptedException {
- // Create a queue
- JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(jobQueueName);
-
- // Create and enqueue a job
- JobConfig.Builder jobConfig = new JobConfig.Builder();
-
- // Create tasks
- List<TaskConfig> taskConfigs = new ArrayList<>();
- for (int i = 0; i < numTasks; i++) {
- taskConfigs.add(new TaskConfig.Builder()
- .setTaskId("task_" + i)
- .setCommand(MockTask.TASK_COMMAND)
- .addConfig(MockTask.JOB_DELAY, String.valueOf(taskTimeout))
- .build());
- }
- // Run up to 2 tasks at a time
- jobConfig.addTaskConfigs(taskConfigs).setNumConcurrentTasksPerInstance(2);
- queueBuilder.enqueueJob("job_0", jobConfig);
- _driver.start(queueBuilder.build());
- // 1 second delay for the Controller
- Thread.sleep(1000);
-
- // Pull jobContexts and look at the start times
- List<Long> startTimes = new ArrayList<>();
- WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName);
- for (String job : workflowConfig.getJobDag().getAllNodes()) {
- JobContext jobContext = _driver.getJobContext(job);
- Set<Integer> allPartitions = jobContext.getPartitionSet();
- for (Integer partition : allPartitions) {
- String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY);
- if (timestamp == null) {
- startTimes.add(INVALID_TIMESTAMP);
- } else {
- startTimes.add(Long.parseLong(timestamp));
- }
- }
- }
- Collections.sort(startTimes);
- return startTimes;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
index 7cfbe85..9a9a62b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
@@ -62,7 +62,8 @@ public class WorkflowAccessor extends AbstractHelixResource {
WorkflowConfig,
WorkflowContext,
Jobs,
- ParentJobs
+ ParentJobs,
+ LastScheduledTask
}
public enum TaskCommand {
@@ -112,7 +113,7 @@ public class WorkflowAccessor extends AbstractHelixResource {
ObjectNode parentJobs = OBJECT_MAPPER.valueToTree(jobDag.getChildrenToParents());
root.put(WorkflowProperties.Jobs.name(), jobs);
root.put(WorkflowProperties.ParentJobs.name(), parentJobs);
-
+ root.put(WorkflowProperties.LastScheduledTask.name(), OBJECT_MAPPER.valueToTree(taskDriver.getLastScheduledTaskExecutionInfo(workflowId)));
return JSONRepresentation(root);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
index ad8894a..3e3b8ae 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
@@ -6,12 +6,12 @@ import java.util.Set;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-
import org.apache.helix.TestHelper;
import org.apache.helix.rest.server.resources.helix.WorkflowAccessor;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskExecutionInfo;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.codehaus.jackson.JsonNode;
@@ -51,12 +51,17 @@ public class TestWorkflowAccessor extends AbstractTestClass {
@Test(dependsOnMethods = "testGetWorkflows")
public void testGetWorkflow() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
-
String body = get("clusters/" + CLUSTER_NAME + "/workflows/" + WORKFLOW_NAME,
Response.Status.OK.getStatusCode(), true);
JsonNode node = OBJECT_MAPPER.readTree(body);
Assert.assertNotNull(node.get(WorkflowAccessor.WorkflowProperties.WorkflowConfig.name()));
Assert.assertNotNull(node.get(WorkflowAccessor.WorkflowProperties.WorkflowContext.name()));
+
+ TaskExecutionInfo lastScheduledTask = OBJECT_MAPPER
+ .treeToValue(node.get(WorkflowAccessor.WorkflowProperties.LastScheduledTask.name()),
+ TaskExecutionInfo.class);
+ Assert.assertTrue(lastScheduledTask
+ .equals(new TaskExecutionInfo(null, null, null, TaskExecutionInfo.TIMESTAMP_NOT_SET)));
String workflowId =
node.get(WorkflowAccessor.WorkflowProperties.WorkflowConfig.name()).get("WorkflowID")
.getTextValue();