You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/05/17 19:19:35 UTC

[incubator-pinot] branch distribute_tasks created (now 1ebdc7f)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a change to branch distribute_tasks
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 1ebdc7f  [TE] distribute detection and notification tasks

This branch includes the following new commits:

     new 1ebdc7f  [TE] distribute detection and notification tasks

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: [TE] distribute detection and notification tasks

Posted by xh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch distribute_tasks
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1ebdc7fbfc43bcd96a507d7a76166d61c480c3f6
Author: Xiaohui Sun <xh...@xhsun-mn3.linkedin.biz>
AuthorDate: Fri May 17 12:19:19 2019 -0700

    [TE] distribute detection and notification tasks
---
 .../pinot/thirdeye/detection/DetectionPipelineJob.java  | 13 ++++++++++---
 .../thirdeye/detection/alert/DetectionAlertJob.java     | 12 ++++++++++--
 .../detection/alert/DetectionAlertTaskRunner.java       |  7 -------
 .../yaml/CompositePipelineConfigTranslator.java         | 17 +++++++++++++----
 .../yaml/YamlDetectionAlertConfigTranslator.java        | 15 +++++++--------
 .../yaml/YamlDetectionAlertConfigTranslatorTest.java    |  2 +-
 6 files changed, 41 insertions(+), 25 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 6349466..875d728 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
@@ -82,9 +83,15 @@ public class DetectionPipelineJob implements Job {
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
     taskDTO.setTaskInfo(taskInfoJson);
 
-    long taskId = taskDAO.save(taskDTO);
-    LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, taskId);
-
+    // Sleep random 0 - 1000 milliseconds to distribute load to mysql.
+    Random random = new Random();
+    try {
+      Thread.sleep(random.nextInt(1000));
+      long taskId = taskDAO.save(taskDTO);
+      LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, taskId);
+    } catch (InterruptedException e) {
+      LOG.error(e.toString());
+    }
   }
 
   private Long getIdFromJobKey(String jobKey) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index bf89932..f11425a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -22,6 +22,7 @@ package org.apache.pinot.thirdeye.detection.alert;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.List;
+import java.util.Random;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
@@ -91,8 +92,15 @@ public class DetectionAlertJob implements Job {
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
     taskDTO.setTaskInfo(taskInfoJson);
 
-    long taskId = taskDAO.save(taskDTO);
-    LOG.info("Created subscription task {} with settings {}", taskId, taskDTO);
+    // Sleep random 0 - 1000 milliseconds to distribute load to mysql.
+    Random random = new Random();
+    try {
+      Thread.sleep(random.nextInt(1000));
+      long taskId = taskDAO.save(taskDTO);
+      LOG.info("Created subscription task {} with settings {}", taskId, taskDTO);
+    } catch (InterruptedException e) {
+      LOG.error(e.toString());
+    }
   }
 
   private Long getIdFromJobKey(String jobKey) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index b4fba8f..d977c39 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -107,13 +107,6 @@ public class DetectionAlertTaskRunner implements TaskRunner {
       DetectionAlertFilter alertFilter = detAlertTaskFactory.loadAlertFilter(alertConfig, System.currentTimeMillis());
       DetectionAlertFilterResult result = alertFilter.run();
 
-      // TODO: The old UI relies on notified tag to display the anomalies. After the migration
-      // we need to clean up all references to notified tag.
-      for (MergedAnomalyResultDTO anomaly : result.getAllAnomalies()) {
-        anomaly.setNotified(true);
-        mergedAnomalyDAO.update(anomaly);
-      }
-
       // Suppress alerts if any and get the filtered anomalies to be notified
       Set<DetectionAlertSuppressor> alertSuppressors = detAlertTaskFactory.loadAlertSuppressors(alertConfig);
       for (DetectionAlertSuppressor alertSuppressor : alertSuppressors) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 9f14ded..e5377e9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.MapUtils;
@@ -379,16 +380,24 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     return properties;
   }
 
+  //  Default schedule:
+  //  minute granularity: every 15 minutes, starts at 0 minute
+  //  hourly: every hour, starts at 0 minute
+  //  daily: every day, starts at 2 pm UTC
+  //  others: every day, start at 12 am UTC
   private String buildCron() {
+    // starts at random second to reduce task spike
+    Random random = new Random();
+    String second = Integer.toString(random.nextInt(59));
     switch (this.datasetConfig.bucketTimeGranularity().getUnit()) {
       case MINUTES:
-        return "0 0/15 * * * ? *";
+        return second + " 0/15 * * * ? *";
       case HOURS:
-        return "0 0 * * * ? *";
+        return second + " 0 * * * ? *";
       case DAYS:
-        return "0 0 14 * * ? *";
+        return second + " 0 14 * * ? *";
       default:
-        return "0 0 0 * * ?";
+        return second + " 0 0 * * ?";
     }
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
index 9f6d8d9..6b15e98 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
@@ -20,19 +20,14 @@
 package org.apache.pinot.thirdeye.detection.yaml;
 
 import com.google.common.base.CaseFormat;
-import com.google.common.base.Preconditions;
+import java.util.Random;
 import java.util.stream.Collectors;
-import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
-import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.pojo.AlertConfigBean;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
 import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionAlertRegistry;
-import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -68,7 +63,8 @@ public class YamlDetectionAlertConfigTranslator {
   static final String PROP_ALERT_SUPPRESSORS = "alertSuppressors";
   static final String PROP_REFERENCE_LINKS = "referenceLinks";
   static final String PROP_TIME_WINDOWS = "timeWindows";
-  static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *"; // Every 5 min
+  // Every 5 minutes. Second needs to be provided.
+  static final String CRON_SCHEDULE_DEFAULT_NO_SECOND = " 0/5 * * * ? *";
 
   private static final String PROP_ONLY_FETCH_LEGACY_ANOMALIES = "onlyFetchLegacyAnomalies";
   private static final String PROP_DIMENSION = "dimension";
@@ -168,7 +164,10 @@ public class YamlDetectionAlertConfigTranslator {
     alertConfigDTO.setApplication(MapUtils.getString(yamlAlertConfig, PROP_APPLICATION));
     alertConfigDTO.setFrom(MapUtils.getString(yamlAlertConfig, PROP_FROM));
 
-    alertConfigDTO.setCronExpression(MapUtils.getString(yamlAlertConfig, PROP_CRON, CRON_SCHEDULE_DEFAULT));
+    // starts at random second to reduce task spike
+    Random random = new Random();
+    String second = Integer.toString(random.nextInt(59));
+    alertConfigDTO.setCronExpression(MapUtils.getString(yamlAlertConfig, PROP_CRON, second + CRON_SCHEDULE_DEFAULT_NO_SECOND));
     alertConfigDTO.setActive(MapUtils.getBooleanValue(yamlAlertConfig, PROP_ACTIVE, true));
 
     alertConfigDTO.setSubjectType(AlertConfigBean.SubjectType.valueOf(
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
index cb57d1c..f2aaeb3 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
@@ -35,7 +35,7 @@ public class YamlDetectionAlertConfigTranslatorTest {
     alertYamlConfigs.put(PROP_SUBS_GROUP_NAME, "test_group_name");
     alertYamlConfigs.put(PROP_APPLICATION, "test_application");
     alertYamlConfigs.put(PROP_FROM, "thirdeye@thirdeye");
-    alertYamlConfigs.put(PROP_CRON, CRON_SCHEDULE_DEFAULT);
+    alertYamlConfigs.put(PROP_CRON, 0 + CRON_SCHEDULE_DEFAULT_NO_SECOND);
     alertYamlConfigs.put(PROP_ACTIVE, true);
     alertYamlConfigs.put(PROP_DETECTION_NAMES, Collections.singletonList("test_pipeline_1"));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org