You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/05/21 21:20:22 UTC
[incubator-pinot] branch master updated: [TE] distribute detection
and notification tasks (#4217)
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 866289c [TE] distribute detection and notification tasks (#4217)
866289c is described below
commit 866289cf2671316d73195434c5aabd8e315ba18f
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Tue May 21 14:20:17 2019 -0700
[TE] distribute detection and notification tasks (#4217)
* [TE] distribute detection and notification tasks
* [TE] fix integration test by adding cron explicitly
* [TE] revert removing notification flag
* [TE] randomization when creating the task
* [TE] randomization when creating the task
* [TE] Distribute job load to add random delays
* [TE] Fix minor bug when logging sleep time
---
.../pinot/thirdeye/detection/DetectionPipelineJob.java | 17 ++++++++++++++---
.../thirdeye/detection/alert/DetectionAlertJob.java | 16 ++++++++++++++--
.../yaml/CompositePipelineConfigTranslator.java | 5 +++++
.../yaml/YamlDetectionAlertConfigTranslator.java | 3 ++-
4 files changed, 35 insertions(+), 6 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 6349466..99f2708 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
@@ -82,9 +83,19 @@ public class DetectionPipelineJob implements Job {
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
taskDTO.setTaskInfo(taskInfoJson);
- long taskId = taskDAO.save(taskDTO);
- LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, taskId);
-
+ // TODO: revisit it after identifying bottlenecks
+ // Here will write the task information to mysql.
+ // Sleep random 0 - 5 seconds to distribute load to mysql.
+ Random random = new Random();
+ try {
+ int sleepTime = random.nextInt(5000);
+ LOG.info("Wait for " + sleepTime + " milliseconds.");
+ Thread.sleep(sleepTime);
+ long taskId = taskDAO.save(taskDTO);
+ LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, taskId);
+ } catch (InterruptedException e) {
+ LOG.error(e.toString());
+ }
}
private Long getIdFromJobKey(String jobKey) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index bf89932..41ebcd5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -22,6 +22,7 @@ package org.apache.pinot.thirdeye.detection.alert;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
+import java.util.Random;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
@@ -91,8 +92,19 @@ public class DetectionAlertJob implements Job {
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
taskDTO.setTaskInfo(taskInfoJson);
- long taskId = taskDAO.save(taskDTO);
- LOG.info("Created subscription task {} with settings {}", taskId, taskDTO);
+ // TODO: revisit it after identifying bottlenecks
+ // Here will write the task information to mysql.
+ // Sleep random 0 - 5 seconds to distribute load to mysql.
+ Random random = new Random();
+ try {
+ int sleepTime = random.nextInt(5000);
+ LOG.info("Wait for " + sleepTime + " milliseconds.");
+ Thread.sleep(sleepTime);
+ long taskId = taskDAO.save(taskDTO);
+ LOG.info("Created subscription task {} with settings {}", taskId, taskDTO);
+ } catch (InterruptedException e) {
+ LOG.error(e.toString());
+ }
}
private Long getIdFromJobKey(String jobKey) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 5c30ae2..dde36f4 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -401,6 +401,11 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
return properties;
}
+ // Default schedule:
+ // minute granularity: every 15 minutes, starts at 0 minute
+ // hourly: every hour, starts at 0 minute
+ // daily: every day, starts at 2 pm UTC
+ // others: every day, start at 12 am UTC
private String buildCron() {
switch (this.datasetConfig.bucketTimeGranularity().getUnit()) {
case MINUTES:
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
index 04501ab..6b90e1b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
@@ -63,7 +63,8 @@ public class YamlDetectionAlertConfigTranslator {
static final String PROP_ALERT_SUPPRESSORS = "alertSuppressors";
static final String PROP_REFERENCE_LINKS = "referenceLinks";
static final String PROP_TIME_WINDOWS = "timeWindows";
- static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *"; // Every 5 min
+ // Every 5 minutes.
+ static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *";
private static final String PROP_DIMENSION = "dimension";
private static final String PROP_DIMENSION_RECIPIENTS = "dimensionRecipients";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org