You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2018/11/26 20:00:28 UTC

[incubator-pinot] branch master updated: [TE] Pipeline migration - fill in metric timezone (#3542)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d48499b  [TE] Pipeline migration - fill in metric timezone (#3542)
d48499b is described below

commit d48499beabc0dfce1ff88ee597892109c5821c8a
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Nov 26 12:00:23 2018 -0800

    [TE] Pipeline migration - fill in metric timezone (#3542)
    
    - fill in timeZone information for algorithm detector and anomaly filter
    - error handling in the detection
---
 .../dashboard/ThirdEyeDashboardApplication.java     |  2 +-
 .../detection/DetectionMigrationResource.java       | 17 ++++++++++++++---
 .../detection/wrapper/AnomalyDetectorWrapper.java   | 21 ++++++++++++++-------
 3 files changed, 29 insertions(+), 11 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/ThirdEyeDashboardApplication.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index 4349587..ef494b8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -174,7 +174,7 @@ public class ThirdEyeDashboardApplication
     env.jersey().register(new AnomaliesResource(anomalyFunctionFactory, alertFilterFactory));
     env.jersey().register(new DetectionMigrationResource(
         DAO_REGISTRY.getMetricConfigDAO(), DAO_REGISTRY.getAnomalyFunctionDAO(),
-        DAO_REGISTRY.getDetectionConfigManager(), anomalyFunctionFactory, alertFilterFactory));
+        DAO_REGISTRY.getDetectionConfigManager(), DAO_REGISTRY.getDatasetConfigDAO(), anomalyFunctionFactory, alertFilterFactory));
     env.jersey().register(new OnboardResource(config));
     env.jersey().register(new EntityMappingResource());
     env.jersey().register(new OnboardDatasetMetricResource());
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index 28e30f2..c3d08d1 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -19,9 +19,11 @@ package com.linkedin.thirdeye.detection;
 import com.google.common.collect.ImmutableMap;
 import com.linkedin.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
 import com.linkedin.thirdeye.datalayer.bao.AnomalyFunctionManager;
+import com.linkedin.thirdeye.datalayer.bao.DatasetConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.MetricConfigManager;
 import com.linkedin.thirdeye.datalayer.dto.AnomalyFunctionDTO;
+import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
@@ -60,6 +62,7 @@ public class DetectionMigrationResource {
   private final LegacyAnomalyFunctionTranslator translator;
   private final AnomalyFunctionManager anomalyFunctionDAO;
   private final DetectionConfigManager detectionConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
   private final Yaml yaml;
 
   /**
@@ -68,10 +71,13 @@ public class DetectionMigrationResource {
    * @param anomalyFunctionFactory the anomaly function factory
    */
   public DetectionMigrationResource(MetricConfigManager metricConfigDAO, AnomalyFunctionManager anomalyFunctionDAO,
-      DetectionConfigManager detectionConfigDAO, AnomalyFunctionFactory anomalyFunctionFactory,
+      DetectionConfigManager detectionConfigDAO,
+      DatasetConfigManager datasetConfigDAO,
+      AnomalyFunctionFactory anomalyFunctionFactory,
       AlertFilterFactory alertFilterFactory) {
     this.anomalyFunctionDAO = anomalyFunctionDAO;
     this.detectionConfigDAO = detectionConfigDAO;
+    this.datasetConfigDAO = datasetConfigDAO;
     this.translator = new LegacyAnomalyFunctionTranslator(metricConfigDAO, anomalyFunctionFactory, alertFilterFactory);
     DumperOptions options = new DumperOptions();
     options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
@@ -184,10 +190,15 @@ public class DetectionMigrationResource {
     filterYamlParams.put("configuration", params);
     params.putAll(functionDTO.getAlertFilter());
     params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
-    // TODO  timezone
+    params.put("variables.timeZone", getTimezone(functionDTO));
     return filterYamlParams;
   }
 
+  private String getTimezone(AnomalyFunctionDTO functionDTO) {
+    DatasetConfigDTO datasetConfigDTO = this.datasetConfigDAO.findByDataset(functionDTO.getCollection());
+    return datasetConfigDTO.getTimezone();
+  }
+
   private String getBucketPeriod(AnomalyFunctionDTO functionDTO) {
     return new Period(TimeUnit.MILLISECONDS.convert(functionDTO.getBucketSize(), functionDTO.getBucketUnit())).toString();
   }
@@ -201,7 +212,7 @@ public class DetectionMigrationResource {
       params.put((String) property.getKey(), property.getValue());
     }
     params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
-    // TODO  timezone
+    params.put("variables.timeZone", getTimezone(functionDTO));
     if (functionDTO.getWindowDelay() != 0) {
       detectorYaml.put(PROP_WINDOW_DELAY, functionDTO.getWindowDelay());
       detectorYaml.put(PROP_WINDOW_DELAY_UNIT, functionDTO.getWindowDelayUnit().toString());
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index e8e938f..a650550 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -73,6 +73,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private final TimeUnit windowDelayUnit;
   private final int windowSize;
   private final TimeUnit windowUnit;
+  private final MetricConfigDTO metric;
+  private final MetricEntity metricEntity;
   private final boolean isMovingWindowDetection;
   private DatasetConfigDTO dataset;
   private DateTimeZone dateTimeZone;
@@ -84,6 +86,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     super(provider, config, startTime, endTime);
 
     this.metricUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
+    this.metricEntity = MetricEntity.fromURN(this.metricUrn);
+    this.metric = provider.fetchMetrics(Collections.singleton(this.metricEntity.getId())).get(this.metricEntity.getId());
 
     Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_DETECTOR));
     this.detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_DETECTOR));
@@ -106,18 +110,21 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     List<Interval> monitoringWindows = this.getMonitoringWindows();
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
     for (Interval window : monitoringWindows) {
-      anomalies.addAll(anomalyDetector.runDetection(window, this.metricUrn));
+      List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
+      try {
+        anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
+      } catch (Exception e) {
+        LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
+      }
+      anomalies.addAll(anomaliesForOneWindow);
     }
 
-    MetricEntity me = MetricEntity.fromURN(this.metricUrn);
-    MetricConfigDTO metric = provider.fetchMetrics(Collections.singleton(me.getId())).get(me.getId());
-
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       anomaly.setDetectionConfigId(this.config.getId());
       anomaly.setMetricUrn(this.metricUrn);
-      anomaly.setMetric(metric.getName());
-      anomaly.setCollection(metric.getDataset());
-      anomaly.setDimensions(DetectionUtils.toFilterMap(me.getFilters()));
+      anomaly.setMetric(this.metric.getName());
+      anomaly.setCollection(this.metric.getDataset());
+      anomaly.setDimensions(DetectionUtils.toFilterMap(this.metricEntity.getFilters()));
       anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_KEY, this.detectorReferenceKey);
     }
     return new DetectionPipelineResult(anomalies);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org