You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pinot.apache.org by GitBox <gi...@apache.org> on 2019/01/07 21:42:46 UTC

[GitHub] jihaozh closed pull request #3660: [TE] Skip scheduling detection task if one is already in the queue

jihaozh closed pull request #3660: [TE] Skip scheduling detection task if one is already in the queue
URL: https://github.com/apache/incubator-pinot/pull/3660
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 65b7452a55..e232b72a54 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -23,7 +23,11 @@
 import com.linkedin.thirdeye.datalayer.bao.TaskManager;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
+import com.linkedin.thirdeye.datalayer.util.Predicate;
 import com.linkedin.thirdeye.datasource.DAORegistry;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -37,6 +41,7 @@
   private TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();
   private DetectionConfigManager detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
 
   @Override
   public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
@@ -45,6 +50,25 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
     DetectionConfigDTO configDTO = detectionDAO.findById(id);
     DetectionPipelineTaskInfo taskInfo = new DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(), System.currentTimeMillis());
 
+    // check if a task for this detection pipeline is already scheduled
+    String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
+    List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+        Predicate.EQ("name", jobName),
+        Predicate.EQ("startTime", taskInfo.getStart()),
+        Predicate.OR(
+            Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
+            Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+        )
+      )
+    );
+
+    Optional<TaskDTO> latestScheduledTask = scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() > task2.getEndTime() ? task1 : task2);
+    if (latestScheduledTask.isPresent() && taskInfo.getEnd() - latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){
+      // if a task is pending and not time out yet, don't schedule more
+      LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName, taskInfo.getStart());
+      return;
+    }
+
     String taskInfoJson = null;
     try {
       taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -54,7 +78,7 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
 
     TaskDTO taskDTO = new TaskDTO();
     taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
-    taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION, id));
+    taskDTO.setJobName(jobName);
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
     taskDTO.setTaskInfo(taskInfoJson);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pinot.apache.org
For additional commands, e-mail: dev-help@pinot.apache.org