You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/01/04 00:40:47 UTC
[incubator-pinot] branch master updated: [TE] backend - use task
actual start time as start_time instead of create time (#3638)
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ef32663 [TE] backend - use task actual start time as start_time instead of create time (#3638)
ef32663 is described below
commit ef32663fe45003c17272e24057e76cfe830fe8a3
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Thu Jan 3 16:40:42 2019 -0800
[TE] backend - use task actual start time as start_time instead of create time (#3638)
---
.../thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java | 1 -
.../anomaly/classification/ClassificationJobRunner.java | 1 -
.../thirdeye/anomaly/detection/DetectionJobRunner.java | 1 -
.../linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java | 1 -
.../thirdeye/anomaly/onboard/DetectionOnboardResource.java | 1 -
.../java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java | 13 +++++++++++++
.../completeness/checker/DataCompletenessJobRunner.java | 1 -
.../com/linkedin/thirdeye/datalayer/bao/TaskManager.java | 2 ++
.../thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java | 7 +++++++
.../linkedin/thirdeye/detection/DetectionPipelineJob.java | 1 -
.../thirdeye/detection/alert/DetectionAlertJob.java | 1 -
.../thirdeye/datalayer/bao/TestAnomalyTaskManager.java | 8 ++++++++
12 files changed, 30 insertions(+), 8 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
index 5d1af05..331a07d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/alert/v2/AlertJobRunnerV2.java
@@ -140,7 +140,6 @@ public class AlertJobRunnerV2 implements Job {
taskSpec.setTaskType(TaskConstants.TaskType.ALERT2);
taskSpec.setJobName(alertJobContext.getJobName());
taskSpec.setStatus(TaskConstants.TaskStatus.WAITING);
- taskSpec.setStartTime(System.currentTimeMillis());
taskSpec.setTaskInfo(taskInfoJson);
taskSpec.setJobId(alertJobContext.getJobExecutionId());
long taskId = taskDAO.save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
index 40d7d01..23218b4 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/classification/ClassificationJobRunner.java
@@ -90,7 +90,6 @@ public class ClassificationJobRunner implements JobRunner {
taskSpec.setTaskType(TaskConstants.TaskType.CLASSIFICATION);
taskSpec.setJobName(classificationJobContext.getJobName());
taskSpec.setStatus(TaskConstants.TaskStatus.WAITING);
- taskSpec.setStartTime(System.currentTimeMillis());
taskSpec.setTaskInfo(taskInfoJson);
taskSpec.setJobId(classificationJobContext.getJobExecutionId());
long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
index 8a2578c..4e4447d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionJobRunner.java
@@ -161,7 +161,6 @@ public class DetectionJobRunner {
taskSpec.setTaskType(TaskType.ANOMALY_DETECTION);
taskSpec.setJobName(detectionJobContext.getJobName());
taskSpec.setStatus(TaskStatus.WAITING);
- taskSpec.setStartTime(System.currentTimeMillis());
taskSpec.setTaskInfo(taskInfoJson);
taskSpec.setJobId(detectionJobContext.getJobExecutionId());
long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
index 7074343..0cf9225 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/monitor/MonitorJobRunner.java
@@ -108,7 +108,6 @@ public class MonitorJobRunner implements JobRunner {
taskSpec.setTaskType(TaskType.MONITOR);
taskSpec.setJobName(monitorJobContext.getJobName());
taskSpec.setStatus(TaskStatus.WAITING);
- taskSpec.setStartTime(System.currentTimeMillis());
taskSpec.setTaskInfo(taskInfoJson);
taskSpec.setJobId(monitorJobContext.getJobExecutionId());
long taskId = taskDAO.save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
index 80ff96f..648e76f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/onboard/DetectionOnboardResource.java
@@ -111,7 +111,6 @@ public class DetectionOnboardResource {
task.setTaskType(TaskConstants.TaskType.REPLAY);
task.setJobName(jobName);
task.setStatus(TaskConstants.TaskStatus.WAITING);
- task.setStartTime(System.currentTimeMillis());
task.setTaskInfo(taskInfoJson);
task.setJobId(anomalyFunctionId);
this.taskDAO.save(task);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
index 3e81042..a4e9c7f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
@@ -94,6 +94,8 @@ public class TaskDriver {
TaskType taskType = anomalyTaskSpec.getTaskType();
TaskRunner taskRunner = TaskRunnerFactory.getTaskRunnerFromTaskType(taskType);
TaskInfo taskInfo = TaskInfoFactory.getTaskInfoFromTaskType(taskType, anomalyTaskSpec.getTaskInfo());
+
+ updateTaskStartTime(anomalyTaskSpec.getId());
LOG.info("Thread {} : Task Info {}", Thread.currentThread().getId(), taskInfo);
List<TaskResult> taskResults = taskRunner.execute(taskInfo, taskContext);
LOG.info("Thread {} : DONE Executing task: {}", Thread.currentThread().getId(), anomalyTaskSpec.getId());
@@ -196,6 +198,17 @@ public class TaskDriver {
return null;
}
+ private void updateTaskStartTime(long taskId) {
+ LOG.info("Thread {} : Starting updateTaskStartTime for task id {}", Thread.currentThread().getId(), taskId);
+ try {
+ long startTime = System.currentTimeMillis();
+ taskDAO.updateTaskStartTime(taskId, startTime);
+ LOG.info("Thread {} : Updated task start time {}", Thread.currentThread().getId(), startTime);
+ } catch (Exception e) {
+ LOG.error("Exception in updating task start time", e);
+ }
+ }
+
private void updateStatusAndTaskEndTime(long taskId, TaskStatus oldStatus, TaskStatus newStatus, String message) {
LOG.info("Thread {} : Starting updateStatus for task id {}", Thread.currentThread().getId(), taskId);
try {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
index 84e4580..d5697c0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/completeness/checker/DataCompletenessJobRunner.java
@@ -150,7 +150,6 @@ public class DataCompletenessJobRunner implements JobRunner {
taskSpec.setTaskType(TaskType.DATA_COMPLETENESS);
taskSpec.setJobName(dataCompletenessJobContext.getJobName());
taskSpec.setStatus(TaskStatus.WAITING);
- taskSpec.setStartTime(System.currentTimeMillis());
taskSpec.setTaskInfo(taskInfoJson);
taskSpec.setJobId(dataCompletenessJobContext.getJobExecutionId());
long taskId = DAO_REGISTRY.getTaskDAO().save(taskSpec);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
index 38d46a4..a9e79e7 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/TaskManager.java
@@ -40,6 +40,8 @@ public interface TaskManager extends AbstractManager<TaskDTO>{
void updateStatusAndTaskEndTime(Long id, TaskStatus oldStatus, TaskStatus newStatus,
Long taskEndTime, String message);
+ void updateTaskStartTime(Long id, Long taskStartTime);
+
int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status);
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index 141a330..d7a133c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -114,6 +114,13 @@ public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements Tas
}
@Override
+ public void updateTaskStartTime(Long id, Long taskStartTime) {
+ TaskDTO task = findById(id);
+ task.setStartTime(taskStartTime);
+ save(task);
+ }
+
+ @Override
@Transactional
public int deleteRecordsOlderThanDaysWithStatus(int days, TaskStatus status) {
DateTime expireDate = new DateTime().minusDays(days);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 37cc3f7..65b7452 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -56,7 +56,6 @@ public class DetectionPipelineJob implements Job {
taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION, id));
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
- taskDTO.setStartTime(System.currentTimeMillis());
taskDTO.setTaskInfo(taskInfoJson);
long taskId = taskDAO.save(taskDTO);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
index c7d3eba..4b96846 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertJob.java
@@ -69,7 +69,6 @@ public class DetectionAlertJob implements Job {
taskDTO.setTaskType(TaskConstants.TaskType.DETECTION_ALERT);
taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId));
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
- taskDTO.setStartTime(System.currentTimeMillis());
taskDTO.setTaskInfo(taskInfoJson);
long taskId = taskDAO.save(taskDTO);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
index c9621c2..9935079 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/datalayer/bao/TestAnomalyTaskManager.java
@@ -121,6 +121,14 @@ public class TestAnomalyTaskManager {
Assert.assertEquals(anomalyTaskSpecs.size(), 1);
}
+ @Test(dependsOnMethods = {"testCreate"})
+ public void testUpdateTaskStartTime() {
+ long taskStartTime = System.currentTimeMillis();
+ taskDAO.updateTaskStartTime(anomalyTaskId1, taskStartTime);
+ TaskDTO anomalyTask = taskDAO.findById(anomalyTaskId1);
+ Assert.assertEquals(anomalyTask.getStartTime(), taskStartTime);
+ }
+
@Test(dependsOnMethods = {"testFindByJobIdStatusNotIn"})
public void testDeleteRecordOlderThanDaysWithStatus() {
TaskStatus status = TaskStatus.COMPLETED;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org