You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/03/29 18:32:07 UTC
[incubator-pinot] branch master updated: [TE] set max detection
task window to 7 days (#4030)
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3cd2082 [TE] set max detection task window to 7 days (#4030)
3cd2082 is described below
commit 3cd2082505212a4d43c314d8fd4e089a694d40d9
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Fri Mar 29 11:32:02 2019 -0700
[TE] set max detection task window to 7 days (#4030)
---
.../thirdeye/detection/DetectionPipelineJob.java | 54 +++++++++++++---------
1 file changed, 32 insertions(+), 22 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 3df93d8..6349466 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -47,37 +47,23 @@ public class DetectionPipelineJob implements Job {
private DetectionConfigManager detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
+ private static final long DETECTION_TASK_MAX_LOOKBACK_WINDOW = TimeUnit.DAYS.toMillis(7);
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobKey jobKey = jobExecutionContext.getJobDetail().getKey();
Long id = getIdFromJobKey(jobKey.getName());
DetectionConfigDTO configDTO = detectionDAO.findById(id);
- DetectionPipelineTaskInfo taskInfo = new DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(), System.currentTimeMillis());
- // check if a task for this detection pipeline is already scheduled
+ // Make sure start time is not out of DETECTION_TASK_MAX_LOOKBACK_WINDOW
+ long end = System.currentTimeMillis();
+ long start = Math.max(configDTO.getLastTimestamp(), end - DETECTION_TASK_MAX_LOOKBACK_WINDOW);
+ DetectionPipelineTaskInfo taskInfo = new DetectionPipelineTaskInfo(configDTO.getId(), start, end);
+
String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
- List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
- Predicate.EQ("name", jobName),
- Predicate.OR(
- Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
- Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
- )
- )
- );
- List<DetectionPipelineTaskInfo> scheduledTaskInfos = scheduledTasks.stream().map(taskDTO -> {
- try {
- return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), DetectionPipelineTaskInfo.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
- Optional<DetectionPipelineTaskInfo> latestScheduledTask = scheduledTaskInfos.stream()
- .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
- if (latestScheduledTask.isPresent()
- && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < DETECTION_TASK_TIMEOUT) {
- // if a task is pending and not time out yet, don't schedule more
+ // if a task is pending and not time out yet, don't schedule more
+ if (checkTaskAlreadyRun(jobName, taskInfo)) {
LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName,
taskInfo.getStart());
return;
@@ -106,6 +92,30 @@ public class DetectionPipelineJob implements Job {
String id = tokens[tokens.length - 1];
return Long.valueOf(id);
}
+
+ private boolean checkTaskAlreadyRun(String jobName, DetectionPipelineTaskInfo taskInfo ) {
+ // check if a task for this detection pipeline is already scheduled
+ List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+ Predicate.EQ("name", jobName),
+ Predicate.OR(
+ Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()),
+ Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+ )
+ )
+ );
+
+ List<DetectionPipelineTaskInfo> scheduledTaskInfos = scheduledTasks.stream().map(taskDTO -> {
+ try {
+ return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), DetectionPipelineTaskInfo.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ Optional<DetectionPipelineTaskInfo> latestScheduledTask = scheduledTaskInfos.stream()
+ .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
+ return latestScheduledTask.isPresent()
+ && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < DETECTION_TASK_TIMEOUT;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org