You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/02/22 23:20:24 UTC

[incubator-pinot] branch master updated: [TE] detection - task scheduler backoff fix (#3866)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f9d096  [TE] detection - task scheduler backoff fix (#3866)
1f9d096 is described below

commit 1f9d096041ec141040ba7add08039a957d2e6af9
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Fri Feb 22 15:20:19 2019 -0800

    [TE] detection - task scheduler backoff fix (#3866)
    
    Fix the task scheduler backoff mechanism to prevent detection tasks pile up.
---
 .../thirdeye/detection/DetectionPipelineJob.java   | 25 ++++++++++++++++------
 .../detection/alert/DetectionAlertJob.java         | 19 +++++++++++++++-
 2 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 2907d7e..3df93d8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -21,6 +21,11 @@ package org.apache.pinot.thirdeye.detection;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
@@ -28,9 +33,6 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -57,7 +59,6 @@ public class DetectionPipelineJob implements Job {
     String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
     List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
         Predicate.EQ("name", jobName),
-        Predicate.EQ("startTime", taskInfo.getStart()),
         Predicate.OR(
             Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
             Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
@@ -65,10 +66,20 @@ public class DetectionPipelineJob implements Job {
       )
     );
 
-    Optional<TaskDTO> latestScheduledTask = scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() > task2.getEndTime() ? task1 : task2);
-    if (latestScheduledTask.isPresent() && taskInfo.getEnd() - latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){
+    List<DetectionPipelineTaskInfo> scheduledTaskInfos = scheduledTasks.stream().map(taskDTO -> {
+      try {
+        return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), DetectionPipelineTaskInfo.class);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+    Optional<DetectionPipelineTaskInfo> latestScheduledTask = scheduledTaskInfos.stream()
+        .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
+    if (latestScheduledTask.isPresent()
+        && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < DETECTION_TASK_TIMEOUT) {
       // if a task is pending and not time out yet, don't schedule more
-      LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName, taskInfo.getStart());
+      LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName,
+          taskInfo.getStart());
       return;
     }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index 499e2f5..f0b448e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -21,11 +21,13 @@ package org.apache.pinot.thirdeye.detection.alert;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.List;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
@@ -61,6 +63,21 @@ public class DetectionAlertJob implements Job {
 
     DetectionAlertTaskInfo taskInfo = new DetectionAlertTaskInfo(detectionAlertConfigId);
 
+    // check if a task for this detection alerter is already scheduled
+    String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId);
+    List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+        Predicate.EQ("name", jobName),
+        Predicate.OR(
+            Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
+            Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+        ))
+    );
+
+    if (!scheduledTasks.isEmpty()){
+      // if a task is pending and not time out yet, don't schedule more
+      LOG.info("Skip scheduling detection alerter task for {}. Task is already in the queue.", jobName);
+      return;
+    }
     String taskInfoJson = null;
     try {
       taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -70,7 +87,7 @@ public class DetectionAlertJob implements Job {
 
     TaskDTO taskDTO = new TaskDTO();
     taskDTO.setTaskType(TaskConstants.TaskType.DETECTION_ALERT);
-    taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId));
+    taskDTO.setJobName(jobName);
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
     taskDTO.setTaskInfo(taskInfoJson);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org