You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/29 00:53:22 UTC

helix git commit: [HELIX-688] Add method that returns start time of the most recent task scheduled

Repository: helix
Updated Branches:
  refs/heads/master 667b1887f -> e24096839


[HELIX-688] Add method that returns start time of the most recent task scheduled

getLastScheduledTaskTimestamp returns the timestamp for the start time of the task that was scheduled last. Clients of Task Framework may use this API against their time to completion metric to determine if a given workflow/job/task is stuck.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e2409683
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e2409683
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e2409683

Branch: refs/heads/master
Commit: e240968397544423dee9e100fa1b53720c89c7a0
Parents: 667b188
Author: narendly <na...@gmail.com>
Authored: Tue Mar 27 16:25:22 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Tue Mar 27 16:35:04 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |  37 +++++++
 .../task/TestGetLastScheduledTaskTimestamp.java | 109 +++++++++++++++++++
 2 files changed, 146 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e2409683/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 957333c..174cae1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -66,6 +66,8 @@ public class TaskDriver {
   //
   // TODO Implement or configure the limitation in ZK server.
   private final static int DEFAULT_CONFIGS_LIMITATION = 10000;
+  private final static long TIMESTAMP_NOT_SET = -1L;
+  private final static String TASK_START_TIME_KEY = "START_TIME";
   protected int _configsLimitation = DEFAULT_CONFIGS_LIMITATION;
 
   private final HelixDataAccessor _accessor;
@@ -845,6 +847,41 @@ public class TaskDriver {
   }
 
   /**
+   * This function returns the timestamp of the very last task that was scheduled. It is provided to help determine
+   * whether a given Workflow/Job/Task is stuck.
+   *
+   * @param workflowName The name of the workflow
+   * @return timestamp of the most recent job scheduled.
+   * -1L if timestamp is not set (either nothing is scheduled or no start time recorded).
+   */
+  public long getLastScheduledTaskTimestamp(String workflowName) {
+    long lastScheduledTaskTimestamp = TIMESTAMP_NOT_SET;
+
+    WorkflowContext workflowContext = getWorkflowContext(workflowName);
+    if (workflowContext != null) {
+      Map<String, TaskState> allJobStates = workflowContext.getJobStates();
+      for (String job : allJobStates.keySet()) {
+        if (!allJobStates.get(job).equals(TaskState.NOT_STARTED)) {
+          JobContext jobContext = getJobContext(job);
+          if (jobContext != null) {
+            Set<Integer> allPartitions = jobContext.getPartitionSet();
+            for (Integer partition : allPartitions) {
+              String startTime = jobContext.getMapField(partition).get(TASK_START_TIME_KEY);
+              if (startTime != null) {
+                long startTimeLong = Long.parseLong(startTime);
+                if (startTimeLong > lastScheduledTaskTimestamp) {
+                  lastScheduledTaskTimestamp = startTimeLong;
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+    return lastScheduledTaskTimestamp;
+  }
+
+  /**
    * Throw Exception if children nodes will exceed limitation after adding newNodesCount children.
    * @param newConfigNodeCount
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/e2409683/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
new file mode 100644
index 0000000..5b41584
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
@@ -0,0 +1,109 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestGetLastScheduledTaskTimestamp extends TaskTestBase {
+  private final static String TASK_START_TIME_KEY = "START_TIME";
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+  }
+
+  @Test
+  public void testGetLastScheduledTaskTimestamp() throws InterruptedException {
+    List<Long> startTimesWithStuckTasks = setupTasks("TestWorkflow_2", 5, 99999999);
+    // First two must be -1 (two tasks are stuck), and API call must return the last value (most recent timestamp)
+    Assert.assertEquals(startTimesWithStuckTasks.get(0).longValue(), 0);
+    Assert.assertEquals(startTimesWithStuckTasks.get(1).longValue(), 0);
+    Assert.assertEquals(startTimesWithStuckTasks.get(3).longValue(),
+        _driver.getLastScheduledTaskTimestamp("TestWorkflow_2"));
+
+    List<Long> startTimesFastTasks = setupTasks("TestWorkflow_3", 4, 10);
+    // API call needs to return the most recent timestamp (value at last index)
+    Assert.assertEquals(startTimesFastTasks.get(startTimesFastTasks.size() - 1).longValue(),
+        _driver.getLastScheduledTaskTimestamp("TestWorkflow_3"));
+  }
+
+  /**
+   * Helper method for gathering start times for all tasks. Returns start times in ascending order. Null start times
+   * are recorded as 0.
+   *
+   * @param jobQueueName name of the queue
+   * @param numTasks number of tasks to schedule
+   * @param taskTimeout duration of each task to be run for
+   * @return list of timestamps for all tasks in ascending order
+   * @throws InterruptedException
+   */
+  private List<Long> setupTasks(String jobQueueName, int numTasks, long taskTimeout) throws InterruptedException {
+    // Create a queue
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(jobQueueName);
+
+    // Create and enqueue a job
+    JobConfig.Builder jobConfig = new JobConfig.Builder();
+
+    // Create tasks
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    for (int i = 0; i < numTasks; i++) {
+      taskConfigs.add(new TaskConfig.Builder()
+          .setTaskId("task_" + i)
+          .setCommand(MockTask.TASK_COMMAND)
+          .addConfig(MockTask.TIMEOUT_CONFIG, String.valueOf(taskTimeout))
+          .build());
+    }
+    // Run up to 2 tasks at a time
+    jobConfig.addTaskConfigs(taskConfigs).setNumConcurrentTasksPerInstance(2);
+    queueBuilder.enqueueJob("job_0", jobConfig);
+    _driver.start(queueBuilder.build());
+    // 1 second delay for the Controller
+    Thread.sleep(1000);
+
+    // Pull jobContexts and look at the start times
+    List<Long> startTimes = new ArrayList<>();
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName);
+    for (String job : workflowConfig.getJobDag().getAllNodes()) {
+      JobContext jobContext = _driver.getJobContext(job);
+      Set<Integer> allPartitions = jobContext.getPartitionSet();
+      for (Integer partition : allPartitions) {
+        String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY);
+        if (timestamp == null) {
+          startTimes.add(0L);
+        } else {
+          startTimes.add(Long.parseLong(timestamp));
+        }
+      }
+    }
+    Collections.sort(startTimes);
+    return startTimes;
+  }
+}
\ No newline at end of file