You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/03/19 00:30:37 UTC
[incubator-pinot] branch master updated: [TE] detection - align
metric slices (#3981)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f815e2e [TE] detection - align metric slices (#3981)
f815e2e is described below
commit f815e2ea569dfe63d74e69e8dff69fe2f5a2b559
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Mar 18 17:30:32 2019 -0700
[TE] detection - align metric slices (#3981)
Align metric slices to the data granularity.
---
.../thirdeye/detection/DefaultDataProvider.java | 42 ++++++++++++++++++++--
.../detection/wrapper/AnomalyDetectorWrapper.java | 2 +-
2 files changed, 41 insertions(+), 3 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
index a7e9026..935b629 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
@@ -39,6 +39,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
@@ -51,10 +53,12 @@ import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.comparison.Row;
import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
+import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,11 +158,15 @@ public class DefaultDataProvider implements DataProvider {
@Override
public Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices) {
try {
- Map<MetricSlice, DataFrame> cacheResult = DETECTION_TIME_SERIES_CACHE.getAll(slices);
+ Map<MetricSlice, MetricSlice> alignedMetricSlicesToOriginalSlice = new HashMap<>();
+ for (MetricSlice slice: slices) {
+ alignedMetricSlicesToOriginalSlice.put(alignSlice(slice), slice);
+ }
+ Map<MetricSlice, DataFrame> cacheResult = DETECTION_TIME_SERIES_CACHE.getAll(alignedMetricSlicesToOriginalSlice.keySet());
Map<MetricSlice, DataFrame> timeseriesResult = new HashMap<>();
for (Map.Entry<MetricSlice, DataFrame> entry : cacheResult.entrySet()){
// make a copy of the result so that cache won't be contaminated by client code
- timeseriesResult.put(entry.getKey(), entry.getValue().copy());
+ timeseriesResult.put(alignedMetricSlicesToOriginalSlice.get(entry.getKey()), entry.getValue().copy());
}
return timeseriesResult;
} catch (Exception e) {
@@ -314,6 +322,36 @@ public class DefaultDataProvider implements DataProvider {
return diff > 0 ? diff : 0;
}
+ /**
+ * Aligns a metric slice based on its granularity, or the dataset granularity.
+ *
+ * @param slice metric slice
+ * @return aligned metric slice
+ */
+ private MetricSlice alignSlice(MetricSlice slice) {
+ MetricConfigDTO metric = this.metricDAO.findById(slice.getMetricId());
+ if (metric == null) {
+ throw new IllegalArgumentException(String.format("Could not resolve metric id %d", slice.getMetricId()));
+ }
+
+ DatasetConfigDTO dataset = this.datasetDAO.findByDataset(metric.getDataset());
+ if (dataset == null) {
+ throw new IllegalArgumentException(String.format("Could not resolve dataset '%s' for metric id %d", metric.getDataset(), slice.getMetricId()));
+ }
+
+ TimeGranularity granularity = dataset.bucketTimeGranularity();
+ if (!MetricSlice.NATIVE_GRANULARITY.equals(slice.getGranularity())) {
+ granularity = slice.getGranularity();
+ }
+
+ // align to time buckets and request time zone
+ long timeGranularity = granularity.toMillis();
+ long start = (slice.getStart() / timeGranularity) * timeGranularity;
+ long end = ((slice.getEnd() + timeGranularity - 1) / timeGranularity) * timeGranularity;
+
+ return slice.withStart(start).withEnd(end).withGranularity(granularity);
+ }
+
public static void cleanCache() {
if (DETECTION_TIME_SERIES_CACHE != null) {
DETECTION_TIME_SERIES_CACHE.cleanUp();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 8450cf6..eb7117e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -153,7 +153,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time {}", config.getId(), metricUrn, window.getStart(), window.getEnd());
long ts = System.currentTimeMillis();
anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
- LOG.info("[New Pipeline] run anomaly detection for window {} - {} used {} milliseconds", window.getStart(), window.getEnd(), System.currentTimeMillis() - ts);
+ LOG.info("[New Pipeline] run anomaly detection for window {} - {} used {} milliseconds, detected {} anomalies", window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
} catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org