You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/01/07 21:42:50 UTC

[incubator-pinot] branch master updated: [TE] Skip scheduling detection task if one is already in the queue (#3660)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ea2232  [TE] Skip scheduling detection task if one is already in the queue (#3660)
0ea2232 is described below

commit 0ea2232cdc6c5a2d8d8ce9001c64f80459cf7712
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Jan 7 13:42:45 2019 -0800

    [TE] Skip scheduling detection task if one is already in the queue (#3660)
    
    This PR changes the detection scheduler to skip scheduling more detection task if one is already in the queue to avoid system overload.
---
 .../thirdeye/detection/DetectionPipelineJob.java   | 26 +++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 65b7452..e232b72 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -23,7 +23,11 @@ import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.TaskManager;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
+import com.linkedin.thirdeye.datalayer.util.Predicate;
 import com.linkedin.thirdeye.datasource.DAORegistry;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -37,6 +41,7 @@ public class DetectionPipelineJob implements Job {
   private TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();
   private DetectionConfigManager detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
 
   @Override
   public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
@@ -45,6 +50,25 @@ public class DetectionPipelineJob implements Job {
     DetectionConfigDTO configDTO = detectionDAO.findById(id);
     DetectionPipelineTaskInfo taskInfo = new DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(), System.currentTimeMillis());
 
+    // check if a task for this detection pipeline is already scheduled
+    String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
+    List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+        Predicate.EQ("name", jobName),
+        Predicate.EQ("startTime", taskInfo.getStart()),
+        Predicate.OR(
+            Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
+            Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+        )
+      )
+    );
+
+    Optional<TaskDTO> latestScheduledTask = scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() > task2.getEndTime() ? task1 : task2);
+    if (latestScheduledTask.isPresent() && taskInfo.getEnd() - latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){
+      // if a task is pending and not time out yet, don't schedule more
+      LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName, taskInfo.getStart());
+      return;
+    }
+
     String taskInfoJson = null;
     try {
       taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -54,7 +78,7 @@ public class DetectionPipelineJob implements Job {
 
     TaskDTO taskDTO = new TaskDTO();
     taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
-    taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION, id));
+    taskDTO.setJobName(jobName);
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
     taskDTO.setTaskInfo(taskInfoJson);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org