You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/31 20:49:37 UTC

helix git commit: [HELIX-773] add getLastScheduledTaskTimestamp information in workflow rest api

Repository: helix
Updated Branches:
  refs/heads/master 0c251bbf6 -> 566d4f166


[HELIX-773] add getLastScheduledTaskTimestamp information in workflow rest api


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/566d4f16
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/566d4f16
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/566d4f16

Branch: refs/heads/master
Commit: 566d4f166473b477ea0db1cfba5d04c8f3d6bf30
Parents: 0c251bb
Author: Harry Zhang <hr...@linkedin.com>
Authored: Tue Oct 30 16:43:25 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Wed Oct 31 13:48:46 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |  22 +++-
 .../apache/helix/task/TaskExecutionInfo.java    |  65 ++++++++++
 .../task/TestGetLastScheduledTaskExecInfo.java  | 122 +++++++++++++++++++
 .../task/TestGetLastScheduledTaskTimestamp.java | 110 -----------------
 .../resources/helix/WorkflowAccessor.java       |   5 +-
 .../helix/rest/server/TestWorkflowAccessor.java |   9 +-
 6 files changed, 213 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index e6256ed..54e3ab3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -85,7 +85,6 @@ public class TaskDriver {
   // TODO Implement or configure the limitation in ZK server.
   private final static long DEFAULT_CONFIGS_LIMITATION =
       HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.TASK_CONFIG_LIMITATION, 100000L);
-  private final static long TIMESTAMP_NOT_SET = -1L;
   private final static String TASK_START_TIME_KEY = "START_TIME";
   protected long _configsLimitation = DEFAULT_CONFIGS_LIMITATION;
 
@@ -977,14 +976,22 @@ public class TaskDriver {
    * -1L if timestamp is not set (either nothing is scheduled or no start time recorded).
    */
   public long getLastScheduledTaskTimestamp(String workflowName) {
-    long lastScheduledTaskTimestamp = TIMESTAMP_NOT_SET;
+    return getLastScheduledTaskExecutionInfo(workflowName).getStartTimeStamp();
+  }
+
+  public TaskExecutionInfo getLastScheduledTaskExecutionInfo(String workflowName) {
+    long lastScheduledTaskTimestamp = TaskExecutionInfo.TIMESTAMP_NOT_SET;
+    String jobName = null;
+    Integer taskPartitionIndex = null;
+    TaskPartitionState state = null;
+
 
     WorkflowContext workflowContext = getWorkflowContext(workflowName);
     if (workflowContext != null) {
       Map<String, TaskState> allJobStates = workflowContext.getJobStates();
-      for (String job : allJobStates.keySet()) {
-        if (!allJobStates.get(job).equals(TaskState.NOT_STARTED)) {
-          JobContext jobContext = getJobContext(job);
+      for (Map.Entry<String, TaskState> jobState : allJobStates.entrySet()) {
+        if (!jobState.getValue().equals(TaskState.NOT_STARTED)) {
+          JobContext jobContext = getJobContext(jobState.getKey());
           if (jobContext != null) {
             Set<Integer> allPartitions = jobContext.getPartitionSet();
             for (Integer partition : allPartitions) {
@@ -993,6 +1000,9 @@ public class TaskDriver {
                 long startTimeLong = Long.parseLong(startTime);
                 if (startTimeLong > lastScheduledTaskTimestamp) {
                   lastScheduledTaskTimestamp = startTimeLong;
+                  jobName = jobState.getKey();
+                  taskPartitionIndex = partition;
+                  state = jobContext.getPartitionState(partition);
                 }
               }
             }
@@ -1000,7 +1010,7 @@ public class TaskDriver {
         }
       }
     }
-    return lastScheduledTaskTimestamp;
+    return new TaskExecutionInfo(jobName, taskPartitionIndex, state, lastScheduledTaskTimestamp);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java b/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java
new file mode 100644
index 0000000..03d66b4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskExecutionInfo.java
@@ -0,0 +1,65 @@
+package org.apache.helix.task;
+
+import java.io.IOException;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TaskExecutionInfo {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  public final static long TIMESTAMP_NOT_SET = -1L;
+  private final String _jobName;
+  private final Integer _taskPartitionIndex;
+  private final TaskPartitionState _taskPartitionState;
+  private final Long _startTimeStamp;
+
+  @JsonCreator
+  public TaskExecutionInfo(
+      @JsonProperty("jobName") String job,
+      @JsonProperty("taskPartitionIndex") Integer index,
+      @JsonProperty("taskPartitionState") TaskPartitionState state,
+      @JsonProperty("startTimeStamp") Long timeStamp) {
+    _jobName = job;
+    _taskPartitionIndex = index;
+    _taskPartitionState = state;
+    _startTimeStamp = timeStamp == null ? TIMESTAMP_NOT_SET : timeStamp;
+  }
+
+  public String getJobName() {
+    return _jobName;
+  }
+
+  public Integer getTaskPartitionIndex() {
+    return _taskPartitionIndex;
+  }
+
+  public TaskPartitionState getTaskPartitionState() {
+    return _taskPartitionState;
+  }
+
+  public Long getStartTimeStamp() {
+    return _startTimeStamp;
+  }
+
+  public String toJson() throws IOException {
+    return OBJECT_MAPPER.writeValueAsString(this);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof TaskExecutionInfo)) {
+      return false;
+    }
+    TaskExecutionInfo infoObj = (TaskExecutionInfo) obj;
+    return nullOrEquals(getJobName(), infoObj.getJobName()) &&
+        nullOrEquals(getTaskPartitionIndex(), infoObj.getTaskPartitionIndex()) &&
+        nullOrEquals(getTaskPartitionState(), infoObj.getTaskPartitionState()) &&
+        nullOrEquals(getStartTimeStamp(), infoObj.getStartTimeStamp());
+  }
+
+  private boolean nullOrEquals(Object o1, Object o2) {
+    return (o1 == null && o2 == null) || (o1 != null && o2 != null && o1.equals(o2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
new file mode 100644
index 0000000..73fe674
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
@@ -0,0 +1,122 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestGetLastScheduledTaskExecInfo extends TaskTestBase {
+  private final static String TASK_START_TIME_KEY = "START_TIME";
+  private final static long INVALID_TIMESTAMP = -1L;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+  }
+
+  @Test
+  public void testGetLastScheduledTaskExecInfo() throws InterruptedException {
+    List<Long> startTimesWithStuckTasks = setupTasks("TestWorkflow_2", 5, 99999999);
+
+    // First two must be -1 (two tasks are stuck), and API call must return the last value (most recent timestamp)
+    Assert.assertEquals(startTimesWithStuckTasks.get(0).longValue(), INVALID_TIMESTAMP);
+    Assert.assertEquals(startTimesWithStuckTasks.get(1).longValue(), INVALID_TIMESTAMP);
+    TaskExecutionInfo execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_2");
+    Long lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_2");
+    Assert.assertEquals(startTimesWithStuckTasks.get(3), lastScheduledTaskTs);
+
+    Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_2_job_0");
+    // Workflow 2 will stuck, so its partition state will be RUNNING
+    Assert.assertEquals(execInfo.getTaskPartitionState(), TaskPartitionState.RUNNING);
+    Assert.assertEquals(execInfo.getStartTimeStamp(), lastScheduledTaskTs);
+
+    List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10);
+    // API call needs to return the most recent timestamp (value at last index)
+    lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp("TestWorkflow_3");
+    execInfo = _driver.getLastScheduledTaskExecutionInfo("TestWorkflow_3");
+
+    Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1), lastScheduledTaskTs);
+    Assert.assertEquals(execInfo.getJobName(), "TestWorkflow_3_job_0");
+    Assert.assertEquals(execInfo.getTaskPartitionState(), TaskPartitionState.COMPLETED);
+    Assert.assertEquals(execInfo.getStartTimeStamp(), lastScheduledTaskTs);
+  }
+
+  /**
+   * Helper method for gathering start times for all tasks. Returns start times in ascending order. Null start times
+   * are recorded as 0.
+   *
+   * @param jobQueueName name of the queue
+   * @param numTasks number of tasks to schedule
+   * @param taskTimeout duration of each task to be run for
+   * @return list of timestamps for all tasks in ascending order
+   * @throws InterruptedException
+   */
+  private List<Long> setupTasks(String jobQueueName, int numTasks, long taskTimeout) throws InterruptedException {
+    // Create a queue
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(jobQueueName);
+
+    // Create and enqueue a job
+    JobConfig.Builder jobConfig = new JobConfig.Builder();
+
+    // Create tasks
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    for (int i = 0; i < numTasks; i++) {
+      taskConfigs.add(new TaskConfig.Builder()
+          .setTaskId("task_" + i)
+          .setCommand(MockTask.TASK_COMMAND)
+          .addConfig(MockTask.JOB_DELAY, String.valueOf(taskTimeout))
+          .build());
+    }
+    // Run up to 2 tasks at a time
+    jobConfig.addTaskConfigs(taskConfigs).setNumConcurrentTasksPerInstance(2);
+    queueBuilder.enqueueJob("job_0", jobConfig);
+    _driver.start(queueBuilder.build());
+    // 1 second delay for the Controller
+    Thread.sleep(1000);
+
+    // Pull jobContexts and look at the start times
+    List<Long> startTimes = new ArrayList<>();
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName);
+    for (String job : workflowConfig.getJobDag().getAllNodes()) {
+      JobContext jobContext = _driver.getJobContext(job);
+      Set<Integer> allPartitions = jobContext.getPartitionSet();
+      for (Integer partition : allPartitions) {
+        String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY);
+        if (timestamp == null) {
+          startTimes.add(INVALID_TIMESTAMP);
+        } else {
+          startTimes.add(Long.parseLong(timestamp));
+        }
+      }
+    }
+    Collections.sort(startTimes);
+    return startTimes;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
deleted file mode 100644
index 174d8c6..0000000
--- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import org.apache.helix.integration.task.MockTask;
-import org.apache.helix.integration.task.TaskTestBase;
-import org.apache.helix.integration.task.TaskTestUtil;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-public class TestGetLastScheduledTaskTimestamp extends TaskTestBase {
-  private final static String TASK_START_TIME_KEY = "START_TIME";
-  private final static long INVALID_TIMESTAMP = -1L;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    setSingleTestEnvironment();
-    super.beforeClass();
-  }
-
-  @Test
-  public void testGetLastScheduledTaskTimestamp() throws InterruptedException {
-    List<Long> startTimesWithStuckTasks = setupTasks("TestWorkflow_2", 5, 99999999);
-    // First two must be -1 (two tasks are stuck), and API call must return the last value (most recent timestamp)
-    Assert.assertEquals(startTimesWithStuckTasks.get(0).longValue(), INVALID_TIMESTAMP);
-    Assert.assertEquals(startTimesWithStuckTasks.get(1).longValue(), INVALID_TIMESTAMP);
-    Assert.assertEquals(startTimesWithStuckTasks.get(3).longValue(),
-        _driver.getLastScheduledTaskTimestamp("TestWorkflow_2"));
-
-    List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10);
-    // API call needs to return the most recent timestamp (value at last index)
-    Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1).longValue(),
-        _driver.getLastScheduledTaskTimestamp("TestWorkflow_3"));
-  }
-
-  /**
-   * Helper method for gathering start times for all tasks. Returns start times in ascending order. Null start times
-   * are recorded as 0.
-   *
-   * @param jobQueueName name of the queue
-   * @param numTasks number of tasks to schedule
-   * @param taskTimeout duration of each task to be run for
-   * @return list of timestamps for all tasks in ascending order
-   * @throws InterruptedException
-   */
-  private List<Long> setupTasks(String jobQueueName, int numTasks, long taskTimeout) throws InterruptedException {
-    // Create a queue
-    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(jobQueueName);
-
-    // Create and enqueue a job
-    JobConfig.Builder jobConfig = new JobConfig.Builder();
-
-    // Create tasks
-    List<TaskConfig> taskConfigs = new ArrayList<>();
-    for (int i = 0; i < numTasks; i++) {
-      taskConfigs.add(new TaskConfig.Builder()
-          .setTaskId("task_" + i)
-          .setCommand(MockTask.TASK_COMMAND)
-          .addConfig(MockTask.JOB_DELAY, String.valueOf(taskTimeout))
-          .build());
-    }
-    // Run up to 2 tasks at a time
-    jobConfig.addTaskConfigs(taskConfigs).setNumConcurrentTasksPerInstance(2);
-    queueBuilder.enqueueJob("job_0", jobConfig);
-    _driver.start(queueBuilder.build());
-    // 1 second delay for the Controller
-    Thread.sleep(1000);
-
-    // Pull jobContexts and look at the start times
-    List<Long> startTimes = new ArrayList<>();
-    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName);
-    for (String job : workflowConfig.getJobDag().getAllNodes()) {
-      JobContext jobContext = _driver.getJobContext(job);
-      Set<Integer> allPartitions = jobContext.getPartitionSet();
-      for (Integer partition : allPartitions) {
-        String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY);
-        if (timestamp == null) {
-          startTimes.add(INVALID_TIMESTAMP);
-        } else {
-          startTimes.add(Long.parseLong(timestamp));
-        }
-      }
-    }
-    Collections.sort(startTimes);
-    return startTimes;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
index 7cfbe85..9a9a62b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
@@ -62,7 +62,8 @@ public class WorkflowAccessor extends AbstractHelixResource {
     WorkflowConfig,
     WorkflowContext,
     Jobs,
-    ParentJobs
+    ParentJobs,
+    LastScheduledTask
   }
 
   public enum TaskCommand {
@@ -112,7 +113,7 @@ public class WorkflowAccessor extends AbstractHelixResource {
     ObjectNode parentJobs = OBJECT_MAPPER.valueToTree(jobDag.getChildrenToParents());
     root.put(WorkflowProperties.Jobs.name(), jobs);
     root.put(WorkflowProperties.ParentJobs.name(), parentJobs);
-
+    root.put(WorkflowProperties.LastScheduledTask.name(), OBJECT_MAPPER.valueToTree(taskDriver.getLastScheduledTaskExecutionInfo(workflowId)));
     return JSONRepresentation(root);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/566d4f16/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
index ad8894a..3e3b8ae 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestWorkflowAccessor.java
@@ -6,12 +6,12 @@ import java.util.Set;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
 import org.apache.helix.TestHelper;
 import org.apache.helix.rest.server.resources.helix.WorkflowAccessor;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskExecutionInfo;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.codehaus.jackson.JsonNode;
@@ -51,12 +51,17 @@ public class TestWorkflowAccessor extends AbstractTestClass {
   @Test(dependsOnMethods = "testGetWorkflows")
   public void testGetWorkflow() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
-
     String body = get("clusters/" + CLUSTER_NAME + "/workflows/" + WORKFLOW_NAME,
         Response.Status.OK.getStatusCode(), true);
     JsonNode node = OBJECT_MAPPER.readTree(body);
     Assert.assertNotNull(node.get(WorkflowAccessor.WorkflowProperties.WorkflowConfig.name()));
     Assert.assertNotNull(node.get(WorkflowAccessor.WorkflowProperties.WorkflowContext.name()));
+
+    TaskExecutionInfo lastScheduledTask = OBJECT_MAPPER
+        .treeToValue(node.get(WorkflowAccessor.WorkflowProperties.LastScheduledTask.name()),
+            TaskExecutionInfo.class);
+    Assert.assertTrue(lastScheduledTask
+        .equals(new TaskExecutionInfo(null, null, null, TaskExecutionInfo.TIMESTAMP_NOT_SET)));
     String workflowId =
         node.get(WorkflowAccessor.WorkflowProperties.WorkflowConfig.name()).get("WorkflowID")
             .getTextValue();