You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/07/24 22:01:07 UTC

[GitHub] [incubator-pinot] HoraceChoi95 commented on a change in pull request #4466: Adding Mean Variance Rule Detector

HoraceChoi95 commented on a change in pull request #4466: Adding Mean Variance Rule Detector
URL: https://github.com/apache/incubator-pinot/pull/4466#discussion_r307040233
 
 

 ##########
 File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/WoWStdRuleDetector.java
 ##########
 @@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.components;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
+import org.apache.pinot.thirdeye.dataframe.LongSeries;
+import org.apache.pinot.thirdeye.dataframe.Series;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.Pattern;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.spec.WoWStdRuleDetectorSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.pinot.thirdeye.dataframe.DoubleSeries.*;
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+@Components(title = "WoW mean and std-dev forecasting and detection",
+    type = "WOW_STD_RULE",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Forecast using history WoW mean and std-dev",
+    params = {
+        @Param(name = "offset", defaultValue = "wo1w"),
+        @Param(name = "pattern", allowableValues = {"up", "down"}),
+        @Param(name = "lookback"),
+        @Param(name = "sensitivity")})
+public class WoWStdRuleDetector implements AnomalyDetector<WoWStdRuleDetectorSpec>,
+                                           BaselineProvider<WoWStdRuleDetectorSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(WoWStdRuleDetector.class);
+  private InputDataFetcher dataFetcher;
+  private Pattern pattern;
+  private String monitoringGranularity;
+  private TimeGranularity timeGranularity;
+  private double sensitivity;
+  private int lookback;
+
+  private static final String COL_CURR = "current";
+  private static final String COL_ANOMALY = "anomaly";
+  private static final String COL_PATTERN = "pattern";
+  private static final String COL_DIFF = "diff";
+  private static final String COL_DIFF_VIOLATION = "diff_violation";
+  private static final String COL_ERROR = "error";
+  private static final String COL_CHANGE = "change";
+
+  @Override
+  public void init(WoWStdRuleDetectorSpec spec, InputDataFetcher dataFetcher) {
+    this.dataFetcher = dataFetcher;
+    this.pattern = spec.getPattern();
+    this.lookback = spec.getLookback();
+    this.sensitivity = spec.getSensitivity();
+    this.monitoringGranularity = spec.getMonitoringGranularity();
+
+    if (this.monitoringGranularity.equals("1_MONTHS")) {
+      this.timeGranularity = MetricSlice.NATIVE_GRANULARITY;
+    } else {
+      this.timeGranularity = TimeGranularity.fromString(spec.getMonitoringGranularity());
+    }
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    MetricEntity metricEntity = MetricEntity.fromSlice(slice, 0);
+    Interval window = new Interval(slice.getStart(), slice.getEnd());
+    DateTime trainStart;
+
+    if (isMultiDayGranularity()) {
+      trainStart = window.getStart().minusDays(timeGranularity.getSize() * lookback);
+    } else if (this.monitoringGranularity.equals("1_MONTHS")) {
+      trainStart = window.getStart().minusMonths(lookback);
+    } else {
+      trainStart = window.getStart().minusWeeks(lookback);
+    }
+
+    DatasetConfigDTO datasetConfig = this.dataFetcher.fetchData(new InputDataSpec()
+        .withMetricIdsForDataset(Collections.singleton(metricEntity.getId()))).getDatasetForMetricId()
+        .get(metricEntity.getId());
+    DataFrame inputDf = fetchData(metricEntity, trainStart.getMillis(), window.getEndMillis());
+    DataFrame resultDF = computePredictionInterval(inputDf, window.getStartMillis(), datasetConfig.getTimezone());
+    resultDF = resultDF.joinLeft(inputDf.renameSeries(COL_VALUE, COL_CURR), COL_TIME);
+
+    // Exclude the end because baseline calculation should not contain the end
+    if (resultDF.size() > 1) {
+      resultDF = resultDF.head(resultDF.size() - 1);
+    }
+
+    return TimeSeries.fromDataFrame(resultDF);
+  }
+
+  @Override
+  public DetectionResult runDetection(Interval window, String metricUrn) {
+    MetricEntity me = MetricEntity.fromURN(metricUrn);
+    DateTime fetchStart;
+    //get historical data
+    if (isMultiDayGranularity()) {
+      fetchStart = window.getStart().minusDays(timeGranularity.getSize() * lookback);
+    } else if (this.monitoringGranularity.equals("1_MONTHS")) {
+      fetchStart = window.getStart().minusMonths(lookback);
+    } else {
+      fetchStart = window.getStart().minusWeeks(lookback);
+    }
+
+    MetricSlice slice = MetricSlice.from(me.getId(), fetchStart.getMillis(), window.getEndMillis(), me.getFilters(), timeGranularity);
+    DatasetConfigDTO datasetConfig = this.dataFetcher.fetchData(new InputDataSpec()
+        .withMetricIdsForDataset(Collections.singleton(me.getId()))).getDatasetForMetricId()
+        .get(me.getId());
+    // getting data (window + earliest lookback) all at once.
+    LOG.info("Getting data for" + slice.toString());
+    DataFrame dfInput = fetchData(me, fetchStart.getMillis(), window.getEndMillis());
+    DataFrame dfCurr = new DataFrame(dfInput).renameSeries(COL_VALUE, COL_CURR);
+    DataFrame dfBase = computePredictionInterval(dfInput, window.getStartMillis(), datasetConfig.getTimezone());
+    DataFrame df = new DataFrame(dfCurr).addSeries(dfBase, COL_VALUE, COL_ERROR);
+    df.addSeries(COL_DIFF, df.getDoubles(COL_CURR).subtract(df.get(COL_VALUE)));
+    df.addSeries(COL_ANOMALY, BooleanSeries.fillValues(df.size(), false));
+
+    // Filter pattern
+    if (pattern.equals(Pattern.UP_OR_DOWN) ) {
+      df.addSeries(COL_PATTERN, BooleanSeries.fillValues(df.size(), true));
+    } else {
+      df.addSeries(COL_PATTERN, pattern.equals(Pattern.UP) ? df.getDoubles(COL_DIFF).gt(0) :
+          df.getDoubles(COL_DIFF).lt(0));
+    }
+    df.addSeries(COL_DIFF_VIOLATION, df.getDoubles(COL_DIFF).abs().gte(df.getDoubles(COL_ERROR)));
+//    df.addSeries(COL_DIFF_VIOLATION, map((Series.BooleanFunction) values -> values[0] || values[1],
+//        df.getDoubles(COL_DIFF).gte(df.getDoubles(COL_UP_ERROR)),df.getDoubles(COL_DIFF).lte(df.getDoubles(COL_DOWN_ERROR))));
+    df.mapInPlace(BooleanSeries.ALL_TRUE, COL_ANOMALY, COL_PATTERN, COL_DIFF_VIOLATION);
+
+    // Anomalies
+    List<MergedAnomalyResultDTO> anomalyResults = DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY,
+        window.getEndMillis(),
+        DetectionUtils.getMonitoringGranularityPeriod(timeGranularity.toAggregationGranularityString(),
+            datasetConfig), datasetConfig);
+    dfBase = dfBase.joinRight(df.retainSeries(COL_TIME, COL_CURR), COL_TIME);
+    DetectionResult dr1 = DetectionResult.from(anomalyResults, TimeSeries.fromDataFrame(dfBase));
+    return DetectionResult.from(anomalyResults, TimeSeries.fromDataFrame(dfBase));
+  }
+
+  private DataFrame computePredictionInterval(DataFrame inputDF, long windowStartTime, String timezone) {
+
+    DataFrame resultDF = new DataFrame();
+    //filter the data inside window for current values.
+    DataFrame forecastDF = inputDF.filter(new Series.LongConditional() {
+      @Override
+      public boolean apply(long... values) {
+        return values[0] >= windowStartTime;
+      }
+    }, COL_TIME).dropNull();
+
+    int size = forecastDF.size();
+    double[] baselineArray = new double[size];
+    double[] upperBoundArray = new double[size];
+    double[] lowerBoundArray = new double[size];
+    long[] resultTimeArray = new long[size];
+    double[] errorArray = new double[size];
+    double[] std = new double[size];
+    double[] mean = new double[size];
+
+    //get the trainingDF for each week, which is the number of lookback to 1 week before the each week predict start time
+    for (int k = 0; k < size; k++) {
+      DataFrame trainingDF;
+      trainingDF = getLookbackDF(inputDF, forecastDF.getLong(COL_TIME, k));
+      //the get historical WoW mean and std.
+      std[k]= trainingDF.getDoubles(COL_CHANGE).std().value();
+      mean[k] = trainingDF.getDoubles(COL_CHANGE).mean().value();
+
+      // We need at least 4 weeks of data
+      if (trainingDF.size() < 4) {
 
 Review comment:
   I added spec validation part. Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org