You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/03/19 00:30:37 UTC

[incubator-pinot] branch master updated: [TE] detection - align metric slices (#3981)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f815e2e  [TE] detection - align metric slices (#3981)
f815e2e is described below

commit f815e2ea569dfe63d74e69e8dff69fe2f5a2b559
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Mar 18 17:30:32 2019 -0700

    [TE] detection - align metric slices (#3981)
    
    Align metric slices to the data granularity.
---
 .../thirdeye/detection/DefaultDataProvider.java    | 42 ++++++++++++++++++++--
 .../detection/wrapper/AnomalyDetectorWrapper.java  |  2 +-
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
index a7e9026..935b629 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
@@ -39,6 +39,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
 import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
@@ -51,10 +53,12 @@ import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.comparison.Row;
 import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
 import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
 import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
+import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -154,11 +158,15 @@ public class DefaultDataProvider implements DataProvider {
   @Override
   public Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices) {
     try {
-      Map<MetricSlice, DataFrame> cacheResult = DETECTION_TIME_SERIES_CACHE.getAll(slices);
+      Map<MetricSlice, MetricSlice> alignedMetricSlicesToOriginalSlice = new HashMap<>();
+      for (MetricSlice slice: slices) {
+        alignedMetricSlicesToOriginalSlice.put(alignSlice(slice), slice);
+      }
+      Map<MetricSlice, DataFrame> cacheResult = DETECTION_TIME_SERIES_CACHE.getAll(alignedMetricSlicesToOriginalSlice.keySet());
       Map<MetricSlice, DataFrame> timeseriesResult = new HashMap<>();
       for (Map.Entry<MetricSlice, DataFrame> entry : cacheResult.entrySet()){
         // make a copy of the result so that cache won't be contaminated by client code
-        timeseriesResult.put(entry.getKey(), entry.getValue().copy());
+        timeseriesResult.put(alignedMetricSlicesToOriginalSlice.get(entry.getKey()), entry.getValue().copy());
       }
       return  timeseriesResult;
     } catch (Exception e) {
@@ -314,6 +322,36 @@ public class DefaultDataProvider implements DataProvider {
     return diff > 0 ? diff : 0;
   }
 
+  /**
+   * Aligns a metric slice based on its granularity, or the dataset granularity.
+   *
+   * @param slice metric slice
+   * @return aligned metric slice
+   */
+  private MetricSlice alignSlice(MetricSlice slice) {
+    MetricConfigDTO metric = this.metricDAO.findById(slice.getMetricId());
+    if (metric == null) {
+      throw new IllegalArgumentException(String.format("Could not resolve metric id %d", slice.getMetricId()));
+    }
+
+    DatasetConfigDTO dataset = this.datasetDAO.findByDataset(metric.getDataset());
+    if (dataset == null) {
+      throw new IllegalArgumentException(String.format("Could not resolve dataset '%s' for metric id %d", metric.getDataset(), slice.getMetricId()));
+    }
+
+    TimeGranularity granularity = dataset.bucketTimeGranularity();
+    if (!MetricSlice.NATIVE_GRANULARITY.equals(slice.getGranularity())) {
+      granularity = slice.getGranularity();
+    }
+
+    // align to time buckets and request time zone
+    long timeGranularity = granularity.toMillis();
+    long start = (slice.getStart() / timeGranularity) * timeGranularity;
+    long end = ((slice.getEnd() + timeGranularity - 1) / timeGranularity) * timeGranularity;
+
+    return slice.withStart(start).withEnd(end).withGranularity(granularity);
+  }
+
   public static void cleanCache() {
     if (DETECTION_TIME_SERIES_CACHE != null) {
       DETECTION_TIME_SERIES_CACHE.cleanUp();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 8450cf6..eb7117e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -153,7 +153,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
         LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time {}", config.getId(), metricUrn, window.getStart(), window.getEnd());
         long ts = System.currentTimeMillis();
         anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
-        LOG.info("[New Pipeline] run anomaly detection for window {} - {} used {} milliseconds", window.getStart(), window.getEnd(), System.currentTimeMillis() - ts);
+        LOG.info("[New Pipeline] run anomaly detection for window {} - {} used {} milliseconds, detected {} anomalies", window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
       } catch (Exception e) {
         LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org