You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/03/29 18:32:07 UTC

[incubator-pinot] branch master updated: [TE] set max detection task window to 7 days (#4030)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cd2082  [TE] set max detection task window to 7 days (#4030)
3cd2082 is described below

commit 3cd2082505212a4d43c314d8fd4e089a694d40d9
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Fri Mar 29 11:32:02 2019 -0700

    [TE] set max detection task window to 7 days (#4030)
---
 .../thirdeye/detection/DetectionPipelineJob.java   | 54 +++++++++++++---------
 1 file changed, 32 insertions(+), 22 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 3df93d8..6349466 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -47,37 +47,23 @@ public class DetectionPipelineJob implements Job {
   private DetectionConfigManager detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
+  private static final long DETECTION_TASK_MAX_LOOKBACK_WINDOW = TimeUnit.DAYS.toMillis(7);
 
   @Override
   public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
     JobKey jobKey = jobExecutionContext.getJobDetail().getKey();
     Long id = getIdFromJobKey(jobKey.getName());
     DetectionConfigDTO configDTO = detectionDAO.findById(id);
-    DetectionPipelineTaskInfo taskInfo = new DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(), System.currentTimeMillis());
 
-    // check if a task for this detection pipeline is already scheduled
+    // Make sure start time is not out of DETECTION_TASK_MAX_LOOKBACK_WINDOW
+    long end = System.currentTimeMillis();
+    long start = Math.max(configDTO.getLastTimestamp(), end  - DETECTION_TASK_MAX_LOOKBACK_WINDOW);
+    DetectionPipelineTaskInfo taskInfo = new DetectionPipelineTaskInfo(configDTO.getId(), start, end);
+
     String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
-    List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
-        Predicate.EQ("name", jobName),
-        Predicate.OR(
-            Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
-            Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
-        )
-      )
-    );
 
-    List<DetectionPipelineTaskInfo> scheduledTaskInfos = scheduledTasks.stream().map(taskDTO -> {
-      try {
-        return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), DetectionPipelineTaskInfo.class);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }).collect(Collectors.toList());
-    Optional<DetectionPipelineTaskInfo> latestScheduledTask = scheduledTaskInfos.stream()
-        .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
-    if (latestScheduledTask.isPresent()
-        && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < DETECTION_TASK_TIMEOUT) {
-      // if a task is pending and not time out yet, don't schedule more
+    // if a task is pending and not time out yet, don't schedule more
+    if (checkTaskAlreadyRun(jobName, taskInfo)) {
       LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName,
           taskInfo.getStart());
       return;
@@ -106,6 +92,30 @@ public class DetectionPipelineJob implements Job {
     String id = tokens[tokens.length - 1];
     return Long.valueOf(id);
   }
+
+  private boolean checkTaskAlreadyRun(String jobName, DetectionPipelineTaskInfo taskInfo ) {
+    // check if a task for this detection pipeline is already scheduled
+    List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+        Predicate.EQ("name", jobName),
+        Predicate.OR(
+            Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
+            Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+        )
+        )
+    );
+
+    List<DetectionPipelineTaskInfo> scheduledTaskInfos = scheduledTasks.stream().map(taskDTO -> {
+      try {
+        return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), DetectionPipelineTaskInfo.class);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+    Optional<DetectionPipelineTaskInfo> latestScheduledTask = scheduledTaskInfos.stream()
+        .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
+    return latestScheduledTask.isPresent()
+        && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < DETECTION_TASK_TIMEOUT;
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org