You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/01/07 21:42:50 UTC
[incubator-pinot] branch master updated: [TE] Skip scheduling
detection task if one is already in the queue (#3660)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0ea2232 [TE] Skip scheduling detection task if one is already in the queue (#3660)
0ea2232 is described below
commit 0ea2232cdc6c5a2d8d8ce9001c64f80459cf7712
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Jan 7 13:42:45 2019 -0800
[TE] Skip scheduling detection task if one is already in the queue (#3660)
This PR changes the detection scheduler to skip scheduling more detection task if one is already in the queue to avoid system overload.
---
.../thirdeye/detection/DetectionPipelineJob.java | 26 +++++++++++++++++++++-
1 file changed, 25 insertions(+), 1 deletion(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 65b7452..e232b72 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -23,7 +23,11 @@ import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
import com.linkedin.thirdeye.datalayer.bao.TaskManager;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
+import com.linkedin.thirdeye.datalayer.util.Predicate;
import com.linkedin.thirdeye.datasource.DAORegistry;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -37,6 +41,7 @@ public class DetectionPipelineJob implements Job {
private TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();
private DetectionConfigManager detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
@@ -45,6 +50,25 @@ public class DetectionPipelineJob implements Job {
DetectionConfigDTO configDTO = detectionDAO.findById(id);
DetectionPipelineTaskInfo taskInfo = new DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(), System.currentTimeMillis());
+ // check if a task for this detection pipeline is already scheduled
+ String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
+ List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+ Predicate.EQ("name", jobName),
+ Predicate.EQ("startTime", taskInfo.getStart()),
+ Predicate.OR(
+ Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
+ Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+ )
+ )
+ );
+
+ Optional<TaskDTO> latestScheduledTask = scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() > task2.getEndTime() ? task1 : task2);
+ if (latestScheduledTask.isPresent() && taskInfo.getEnd() - latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){
+ // if a task is pending and not time out yet, don't schedule more
+ LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName, taskInfo.getStart());
+ return;
+ }
+
String taskInfoJson = null;
try {
taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -54,7 +78,7 @@ public class DetectionPipelineJob implements Job {
TaskDTO taskDTO = new TaskDTO();
taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
- taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION, id));
+ taskDTO.setJobName(jobName);
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
taskDTO.setTaskInfo(taskInfoJson);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org