You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/02/22 23:20:24 UTC
[incubator-pinot] branch master updated: [TE] detection - task
scheduler backoff fix (#3866)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1f9d096 [TE] detection - task scheduler backoff fix (#3866)
1f9d096 is described below
commit 1f9d096041ec141040ba7add08039a957d2e6af9
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Fri Feb 22 15:20:19 2019 -0800
[TE] detection - task scheduler backoff fix (#3866)
Fix the task scheduler backoff mechanism to prevent detection tasks pile up.
---
.../thirdeye/detection/DetectionPipelineJob.java | 25 ++++++++++++++++------
.../detection/alert/DetectionAlertJob.java | 19 +++++++++++++++-
2 files changed, 36 insertions(+), 8 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 2907d7e..3df93d8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -21,6 +21,11 @@ package org.apache.pinot.thirdeye.detection;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
@@ -28,9 +33,6 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
import org.apache.pinot.thirdeye.datalayer.util.Predicate;
import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -57,7 +59,6 @@ public class DetectionPipelineJob implements Job {
String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
Predicate.EQ("name", jobName),
- Predicate.EQ("startTime", taskInfo.getStart()),
Predicate.OR(
Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
@@ -65,10 +66,20 @@ public class DetectionPipelineJob implements Job {
)
);
- Optional<TaskDTO> latestScheduledTask = scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() > task2.getEndTime() ? task1 : task2);
- if (latestScheduledTask.isPresent() && taskInfo.getEnd() - latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){
+ List<DetectionPipelineTaskInfo> scheduledTaskInfos = scheduledTasks.stream().map(taskDTO -> {
+ try {
+ return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), DetectionPipelineTaskInfo.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ Optional<DetectionPipelineTaskInfo> latestScheduledTask = scheduledTaskInfos.stream()
+ .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
+ if (latestScheduledTask.isPresent()
+ && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < DETECTION_TASK_TIMEOUT) {
// if a task is pending and not time out yet, don't schedule more
- LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName, taskInfo.getStart());
+ LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName,
+ taskInfo.getStart());
return;
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index 499e2f5..f0b448e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -21,11 +21,13 @@ package org.apache.pinot.thirdeye.detection.alert;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.List;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
import org.apache.pinot.thirdeye.datasource.DAORegistry;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
@@ -61,6 +63,21 @@ public class DetectionAlertJob implements Job {
DetectionAlertTaskInfo taskInfo = new DetectionAlertTaskInfo(detectionAlertConfigId);
+ // check if a task for this detection alerter is already scheduled
+ String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId);
+ List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+ Predicate.EQ("name", jobName),
+ Predicate.OR(
+ Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
+ Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+ ))
+ );
+
+ if (!scheduledTasks.isEmpty()){
+ // if a task is pending and not time out yet, don't schedule more
+ LOG.info("Skip scheduling detection alerter task for {}. Task is already in the queue.", jobName);
+ return;
+ }
String taskInfoJson = null;
try {
taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -70,7 +87,7 @@ public class DetectionAlertJob implements Job {
TaskDTO taskDTO = new TaskDTO();
taskDTO.setTaskType(TaskConstants.TaskType.DETECTION_ALERT);
- taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId));
+ taskDTO.setJobName(jobName);
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
taskDTO.setTaskInfo(taskInfoJson);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org