You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pinot.apache.org by GitBox <gi...@apache.org> on 2019/01/04 00:40:43 UTC

[GitHub] xiaohui-sun closed pull request #3638: [TE] backend - use task actual start time as start_time

xiaohui-sun closed pull request #3638: [TE] backend - use task actual start time as start_time
URL: https://github.com/apache/incubator-pinot/pull/3638
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
index 5d1af051fe..331a07d877 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
@@ -140,7 +140,6 @@ private long createJob(DateTime monitoringWindowStartTime, DateTime monitoringWi
         taskSpec.setTaskType(TaskConstants.TaskType.ALERT2);
         taskSpec.setJobName(alertJobContext.getJobName());
         taskSpec.setStatus(TaskConstants.TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(alertJobContext.getJobExecutionId());
         long taskId = taskDAO.save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
index 40d7d012d1..23218b420e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
@@ -90,7 +90,6 @@ public Long createJob() {
         taskSpec.setTaskType(TaskConstants.TaskType.CLASSIFICATION);
         taskSpec.setJobName(classificationJobContext.getJobName());
         taskSpec.setStatus(TaskConstants.TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(classificationJobContext.getJobExecutionId());
         long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
index 8a2578c012..4e4447da01 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
@@ -161,7 +161,6 @@ private long createJob(String jobName, DateTime monitoringWindowStartTime, DateT
         taskSpec.setTaskType(TaskType.ANOMALY_DETECTION);
         taskSpec.setJobName(detectionJobContext.getJobName());
         taskSpec.setStatus(TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(detectionJobContext.getJobExecutionId());
         long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
index 7074343d7f..0cf9225e93 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
@@ -108,7 +108,6 @@ public Long createJob() {
         taskSpec.setTaskType(TaskType.MONITOR);
         taskSpec.setJobName(monitorJobContext.getJobName());
         taskSpec.setStatus(TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(monitorJobContext.getJobExecutionId());
         long taskId = taskDAO.save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
index 80ff96f28c..648e76fe0d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
@@ -111,7 +111,6 @@ public String createDetectionOnboardingJob(@ApiParam(required = true) @NotNull @
       task.setTaskType(TaskConstants.TaskType.REPLAY);
       task.setJobName(jobName);
       task.setStatus(TaskConstants.TaskStatus.WAITING);
-      task.setStartTime(System.currentTimeMillis());
       task.setTaskInfo(taskInfoJson);
       task.setJobId(anomalyFunctionId);
       this.taskDAO.save(task);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
index 3e8104267a..a4e9c7fa1f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
@@ -94,6 +94,8 @@ public void start() throws Exception {
                 TaskType taskType = anomalyTaskSpec.getTaskType();
                 TaskRunner taskRunner = TaskRunnerFactory.getTaskRunnerFromTaskType(taskType);
                 TaskInfo taskInfo = TaskInfoFactory.getTaskInfoFromTaskType(taskType, anomalyTaskSpec.getTaskInfo());
+
+                updateTaskStartTime(anomalyTaskSpec.getId());
                 LOG.info("Thread {} : Task Info {}", Thread.currentThread().getId(), taskInfo);
                 List<TaskResult> taskResults = taskRunner.execute(taskInfo, taskContext);
                 LOG.info("Thread {} : DONE Executing task: {}", Thread.currentThread().getId(), anomalyTaskSpec.getId());
@@ -196,6 +198,17 @@ private TaskDTO acquireTask() {
     return null;
   }
 
+  private void updateTaskStartTime(long taskId) {
+    LOG.info("Thread {} : Starting updateTaskStartTime for task id {}", Thread.currentThread().getId(), taskId);
+    try {
+      long startTime = System.currentTimeMillis();
+      taskDAO.updateTaskStartTime(taskId, startTime);
+      LOG.info("Thread {} : Updated task start time {}", Thread.currentThread().getId(), startTime);
+    } catch (Exception e) {
+      LOG.error("Exception in updating task start time", e);
+    }
+  }
+
   private void updateStatusAndTaskEndTime(long taskId, TaskStatus oldStatus, TaskStatus newStatus, String message) {
     LOG.info("Thread {} : Starting updateStatus for task id {}", Thread.currentThread().getId(), taskId);
     try {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
index 84e458046a..d5697c0f34 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
@@ -150,7 +150,6 @@ public Long createJob() {
         taskSpec.setTaskType(TaskType.DATA_COMPLETENESS);
         taskSpec.setJobName(dataCompletenessJobContext.getJobName());
         taskSpec.setStatus(TaskStatus.WAITING);
-        taskSpec.setStartTime(System.currentTimeMillis());
         taskSpec.setTaskInfo(taskInfoJson);
         taskSpec.setJobId(dataCompletenessJobContext.getJobExecutionId());
         long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
index 38d46a44fb..a9e79e7e1f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
@@ -40,6 +40,8 @@ boolean updateStatusAndWorkerId(Long workerId, Long id, Set<TaskStatus> allowedO
   void updateStatusAndTaskEndTime(Long id, TaskStatus oldStatus, TaskStatus newStatus,
       Long taskEndTime, String message);
 
+  void updateTaskStartTime(Long id, Long taskStartTime);
+
   int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status);
 
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index 141a330410..d7a133c995 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -113,6 +113,13 @@ public void updateStatusAndTaskEndTime(Long id, TaskStatus oldStatus, TaskStatus
     }
   }
 
+  @Override
+  public void updateTaskStartTime(Long id, Long taskStartTime) {
+    TaskDTO task = findById(id);
+    task.setStartTime(taskStartTime);
+    save(task);
+  }
+
   @Override
   @Transactional
   public int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 37cc3f76f7..65b7452a55 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -56,7 +56,6 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
     taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
     taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION, id));
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
-    taskDTO.setStartTime(System.currentTimeMillis());
     taskDTO.setTaskInfo(taskInfoJson);
 
     long taskId = taskDAO.save(taskDTO);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
index c7d3eba876..4b96846524 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
@@ -69,7 +69,6 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
     taskDTO.setTaskType(TaskConstants.TaskType.DETECTION_ALERT);
     taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId));
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
-    taskDTO.setStartTime(System.currentTimeMillis());
     taskDTO.setTaskInfo(taskInfoJson);
 
     long taskId = taskDAO.save(taskDTO);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
index c9621c2c06..993507941d 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
@@ -121,6 +121,14 @@ public void testFindByJobIdStatusNotIn() {
     Assert.assertEquals(anomalyTaskSpecs.size(), 1);
   }
 
+  @Test(dependsOnMethods = {"testCreate"})
+  public void testUpdateTaskStartTime() {
+    long taskStartTime = System.currentTimeMillis();
+    taskDAO.updateTaskStartTime(anomalyTaskId1, taskStartTime);
+    TaskDTO anomalyTask = taskDAO.findById(anomalyTaskId1);
+    Assert.assertEquals(anomalyTask.getStartTime(), taskStartTime);
+  }
+
   @Test(dependsOnMethods = {"testFindByJobIdStatusNotIn"})
   public void testDeleteRecordOlderThanDaysWithStatus() {
     TaskStatus status = TaskStatus.COMPLETED;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pinot.apache.org
For additional commands, e-mail: dev-help@pinot.apache.org