You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/01/04 00:40:47 UTC

[incubator-pinot] branch master updated: [TE] backend - use task actual start time as start_time instead of create time (#3638)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ef32663  [TE] backend - use task actual start time as start_time instead of create time (#3638)
ef32663 is described below

commit ef32663fe45003c17272e24057e76cfe830fe8a3
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Thu Jan 3 16:40:42 2019 -0800

    [TE] backend - use task actual start time as start_time instead of create time (#3638)
---
 .../thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java         |  1 -
 .../anomaly/classification/ClassificationJobRunner.java     |  1 -
 .../thirdeye/anomaly/detection/DetectionJobRunner.java      |  1 -
 .../linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java |  1 -
 .../thirdeye/anomaly/onboard/DetectionOnboardResource.java  |  1 -
 .../java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java | 13 +++++++++++++
 .../completeness/checker/DataCompletenessJobRunner.java     |  1 -
 .../com/linkedin/thirdeye/datalayer/bao/TaskManager.java    |  2 ++
 .../thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java        |  7 +++++++
 .../linkedin/thirdeye/detection/DetectionPipelineJob.java   |  1 -
 .../thirdeye/detection/alert/DetectionAlertJob.java         |  1 -
 .../thirdeye/datalayer/bao/TestAnomalyTaskManager.java      |  8 ++++++++
 12 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
index 5d1af05..331a07d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
@@ -140,7 +140,6 @@ public class AlertJobRunnerV2 implements Job {
         taskSpec.setTaskType(TaskConstants.TaskType.ALERT2);
         taskSpec.setJobName(alertJobContext.getJobName());
         taskSpec.setStatus(TaskConstants.TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(alertJobContext.getJobExecutionId());
         long taskId = taskDAO.save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
index 40d7d01..23218b4 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
@@ -90,7 +90,6 @@ public class ClassificationJobRunner implements JobRunner {
         taskSpec.setTaskType(TaskConstants.TaskType.CLASSIFICATION);
         taskSpec.setJobName(classificationJobContext.getJobName());
         taskSpec.setStatus(TaskConstants.TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(classificationJobContext.getJobExecutionId());
         long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
index 8a2578c..4e4447d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
@@ -161,7 +161,6 @@ public class DetectionJobRunner {
         taskSpec.setTaskType(TaskType.ANOMALY_DETECTION);
         taskSpec.setJobName(detectionJobContext.getJobName());
         taskSpec.setStatus(TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(detectionJobContext.getJobExecutionId());
         long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
index 7074343..0cf9225 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
@@ -108,7 +108,6 @@ public class MonitorJobRunner implements JobRunner {
         taskSpec.setTaskType(TaskType.MONITOR);
         taskSpec.setJobName(monitorJobContext.getJobName());
         taskSpec.setStatus(TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(monitorJobContext.getJobExecutionId());
         long taskId = taskDAO.save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
index 80ff96f..648e76f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
@@ -111,7 +111,6 @@ public class DetectionOnboardResource {
       task.setTaskType(TaskConstants.TaskType.REPLAY);
       task.setJobName(jobName);
       task.setStatus(TaskConstants.TaskStatus.WAITING);
-      task.setStartTime(System.currentTimeMillis());
       task.setTaskInfo(taskInfoJson);
       task.setJobId(anomalyFunctionId);
       this.taskDAO.save(task);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
index 3e81042..a4e9c7f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
@@ -94,6 +94,8 @@ public class TaskDriver {
                 TaskType taskType = anomalyTaskSpec.getTaskType();
                 TaskRunner taskRunner = TaskRunnerFactory.getTaskRunnerFromTaskType(taskType);
                 TaskInfo taskInfo = TaskInfoFactory.getTaskInfoFromTaskType(taskType, anomalyTaskSpec.getTaskInfo());
+
+                updateTaskStartTime(anomalyTaskSpec.getId());
                 LOG.info("Thread {} : Task Info {}", Thread.currentThread().getId(), taskInfo);
                 List<TaskResult> taskResults = taskRunner.execute(taskInfo, taskContext);
                 LOG.info("Thread {} : DONE Executing task: {}", Thread.currentThread().getId(), anomalyTaskSpec.getId());
@@ -196,6 +198,17 @@ public class TaskDriver {
     return null;
   }
 
+  private void updateTaskStartTime(long taskId) {
+    LOG.info("Thread {} : Starting updateTaskStartTime for task id {}", Thread.currentThread().getId(), taskId);
+    try {
+      long startTime = System.currentTimeMillis();
+      taskDAO.updateTaskStartTime(taskId, startTime);
+      LOG.info("Thread {} : Updated task start time {}", Thread.currentThread().getId(), startTime);
+    } catch (Exception e) {
+      LOG.error("Exception in updating task start time", e);
+    }
+  }
+
   private void updateStatusAndTaskEndTime(long taskId, TaskStatus oldStatus, TaskStatus newStatus, String message) {
     LOG.info("Thread {} : Starting updateStatus for task id {}", Thread.currentThread().getId(), taskId);
     try {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
index 84e4580..d5697c0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
@@ -150,7 +150,6 @@ public class DataCompletenessJobRunner implements JobRunner {
         taskSpec.setTaskType(TaskType.DATA_COMPLETENESS);
         taskSpec.setJobName(dataCompletenessJobContext.getJobName());
         taskSpec.setStatus(TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(dataCompletenessJobContext.getJobExecutionId());
         long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
index 38d46a4..a9e79e7 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
@@ -40,6 +40,8 @@ public interface TaskManager extends AbstractManager<TaskDTO>{
   void updateStatusAndTaskEndTime(Long id, TaskStatus oldStatus, TaskStatus newStatus,
       Long taskEndTime, String message);
 
+  void updateTaskStartTime(Long id, Long taskStartTime);
+
   int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status);
 
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index 141a330..d7a133c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -114,6 +114,13 @@ public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements Tas
   }
 
   @Override
+  public void updateTaskStartTime(Long id, Long taskStartTime) {
+    TaskDTO task = findById(id);
+    task.setStartTime(taskStartTime);
+    save(task);
+  }
+
+  @Override
   @Transactional
   public int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status) {
     DateTime expireDate = new DateTime().minusDays(days);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 37cc3f7..65b7452 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -56,7 +56,6 @@ public class DetectionPipelineJob implements Job {
     taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
     taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION, id));
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
-    taskDTO.setStartTime(System.currentTimeMillis());
     taskDTO.setTaskInfo(taskInfoJson);
 
     long taskId = taskDAO.save(taskDTO);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
index c7d3eba..4b96846 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
@@ -69,7 +69,6 @@ public class DetectionAlertJob implements Job {
     taskDTO.setTaskType(TaskConstants.TaskType.DETECTION_ALERT);
     taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId));
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
-    taskDTO.setStartTime(System.currentTimeMillis());
     taskDTO.setTaskInfo(taskInfoJson);
 
     long taskId = taskDAO.save(taskDTO);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
index c9621c2..9935079 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
@@ -121,6 +121,14 @@ public class TestAnomalyTaskManager {
     Assert.assertEquals(anomalyTaskSpecs.size(), 1);
   }
 
+  @Test(dependsOnMethods = {"testCreate"})
+  public void testUpdateTaskStartTime() {
+    long taskStartTime = System.currentTimeMillis();
+    taskDAO.updateTaskStartTime(anomalyTaskId1, taskStartTime);
+    TaskDTO anomalyTask = taskDAO.findById(anomalyTaskId1);
+    Assert.assertEquals(anomalyTask.getStartTime(), taskStartTime);
+  }
+
   @Test(dependsOnMethods = {"testFindByJobIdStatusNotIn"})
   public void testDeleteRecordOlderThanDaysWithStatus() {
     TaskStatus status = TaskStatus.COMPLETED;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org