You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/04/09 18:04:39 UTC

[GitHub] [incubator-pinot] tangdian commented on a change in pull request #4067: [TE] Holt Winters detector

tangdian commented on a change in pull request #4067: [TE] Holt Winters detector
URL: https://github.com/apache/incubator-pinot/pull/4067#discussion_r273631601
 
 

 ##########
 File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java
 ##########
 @@ -0,0 +1,538 @@
+package org.apache.pinot.thirdeye.detection.components;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.math3.analysis.MultivariateFunction;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
+import org.apache.pinot.thirdeye.dataframe.LongSeries;
+import org.apache.pinot.thirdeye.dataframe.Series;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.Pattern;
+import org.apache.pinot.thirdeye.detection.algorithm.AlgorithmUtils;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.Tune;
+import org.apache.pinot.thirdeye.detection.spec.HoltWintersDetectorSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.components.Tunable;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.apache.commons.math3.optim.MaxIter;
+import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
+import org.apache.commons.math3.optim.MaxEval;
+import org.apache.commons.math3.optim.SimpleBounds;
+import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer;
+import org.apache.commons.math3.optim.InitialGuess;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+@Components(title = "Holt Winters triple exponential smoothing forecasting and detection",
+    type = "HOLT_WINTERS_RULE",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Forecast with holt winters triple exponential smoothing and generate anomalies",
+    params = {
+        @Param(name = "alpha"),
+        @Param(name = "beta"),
+        @Param(name = "gamma"),
+        @Param(name = "period"),
+        @Param(name = "pattern"),
+        @Param(name = "sensitivity"),
+        @Param(name = "kernelSmoothing")
+    })
+public class HoltWintersDetector implements BaselineProvider<HoltWintersDetectorSpec>,
+                                            AnomalyDetector<HoltWintersDetectorSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(HoltWintersDetector.class);
+  private InputDataFetcher dataFetcher;
+  private static final String COL_CURR = "current";
+  private static final String COL_BASE = "baseline";
+  private static final String COL_ANOMALY = "anomaly";
+  private static final String COL_PATTERN = "pattern";
+  private static final String COL_DIFF = "diff";
+  private static final String COL_DIFF_VIOLATION = "diff_violation";
+  private static final String COL_ERROR = "error";
+  private static final String TRUE = "true";
+  private static final long PARTITION_PERIOD = 604800000L;
+  private static final long KERNEL_PERIOD = 3600000L;
+  private static final int LOOKBACK = 60;
+  private static final Period LOOKBACK_PERIOD = ConfigUtils.parsePeriod("60DAYS");
+
+  private int period;
+  private double alpha;
+  private double beta;
+  private double gamma;
+  private Pattern pattern;
+  private TimeGranularity timeGranularity;
+  private double sensitivity;
+  private String monitoringGranularity;
+  private boolean kernelSmoothing;
+
+  @Override
+  public void init(HoltWintersDetectorSpec spec, InputDataFetcher dataFetcher) {
+    this.period = spec.getPeriod();
+    this.alpha = spec.getAlpha();
+    this.beta = spec.getBeta();
+    this.gamma = spec.getGamma();
+    this.dataFetcher = dataFetcher;
+    this.pattern = Pattern.valueOf(spec.getPattern().toUpperCase());
+    this.kernelSmoothing = spec.getKernelSmoothing().equalsIgnoreCase(TRUE);
+
+    this.sensitivity = spec.getSensitivity();
+    this.monitoringGranularity = spec.getMonitoringGranularity();
+    if (this.monitoringGranularity.equals("1_MONTHS")) {
+      this.timeGranularity = MetricSlice.NATIVE_GRANULARITY;
+    } else {
+      this.timeGranularity = TimeGranularity.fromString(spec.getMonitoringGranularity());
+    }
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    MetricEntity metricEntity = MetricEntity.fromSlice(slice, 0);
+    Interval window = new Interval(slice.getStart(), slice.getEnd());
+    DateTime trainStart = new DateTime(window.getStart()).minus(LOOKBACK_PERIOD);
+    DataFrame inputDf = fetchDataByPart(metricEntity, trainStart.getMillis(), window.getEndMillis());
+    DataFrame resultDF = computePredictionInterval(inputDf,
+        window.getStartMillis());
+
+    // Exclude the end
+    if (resultDF.size() > 1) {
+      resultDF = resultDF.head(resultDF.size() - 1);
+    }
+
+    return TimeSeries.fromDataFrame(resultDF);
+  }
+
+  @Override
+  public List<MergedAnomalyResultDTO> runDetection(Interval window, String metricUrn) {
+    MetricEntity metricEntity = MetricEntity.fromURN(metricUrn);
+    DateTime trainStart = new DateTime(window.getStart()).minus(LOOKBACK_PERIOD);
+    DatasetConfigDTO datasetConfig = this.dataFetcher.fetchData(new InputDataSpec()
+        .withMetricIdsForDataset(Collections.singleton(metricEntity.getId()))).getDatasetForMetricId()
+        .get(metricEntity.getId());
+
+    MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), trainStart.getMillis(), window.getEndMillis(),
+        metricEntity.getFilters(), timeGranularity);
+
+    DataFrame dfInput = fetchDataByPart(metricEntity, trainStart.getMillis(), window.getEndMillis());
+
+    // Kernel smoothing
+    if (kernelSmoothing && !TimeUnit.DAYS.equals(datasetConfig.bucketTimeGranularity().getUnit())) {
+      int kernelSize = (int) (KERNEL_PERIOD / datasetConfig.bucketTimeGranularity().toMillis());
+      // kernel smoothing
+      if (kernelSize > 1) {
+        final int kernelOffset = kernelSize / 2;
+        double[] values = dfInput.getDoubles(COL_VALUE).values();
+        for (int i = 0; i < values.length - kernelSize + 1; i++) {
+          values[i + kernelOffset] = AlgorithmUtils.robustMean(dfInput.getDoubles(COL_VALUE)
+              .slice(i, i + kernelSize), kernelSize).getDouble(kernelSize - 1);
+        }
+        dfInput.addSeries(COL_VALUE, values);
+      }
+    }
+
+    DataFrame dfCurr = new DataFrame(dfInput).renameSeries(COL_VALUE, COL_CURR);
+    DataFrame dfBase = computePredictionInterval(dfInput, window.getStartMillis()).renameSeries(COL_VALUE, COL_BASE);
+    DataFrame df = new DataFrame(dfCurr).addSeries(dfBase);
+    df.addSeries(COL_DIFF, df.getDoubles(COL_CURR).subtract(df.get(COL_BASE)));
+    df.addSeries(COL_ANOMALY, BooleanSeries.fillValues(df.size(), false));
+
+    // consistent with pattern
+    if (pattern.equals(Pattern.UP_OR_DOWN) ) {
+      df.addSeries(COL_PATTERN, BooleanSeries.fillValues(df.size(), true));
+    } else {
+      df.addSeries(COL_PATTERN, pattern.equals(Pattern.UP) ? df.getDoubles(COL_DIFF).gt(0) :
+          df.getDoubles(COL_DIFF).lt(0));
+    }
+    df.addSeries(COL_DIFF_VIOLATION, df.getDoubles(COL_DIFF).abs().gte(df.getDoubles(COL_ERROR)));
+    df.mapInPlace(BooleanSeries.ALL_TRUE, COL_ANOMALY, COL_PATTERN, COL_DIFF_VIOLATION);
+
+    // anomalies
+    List<MergedAnomalyResultDTO> result = DetectionUtils.makeAnomalies(sliceData, df, COL_ANOMALY,
+        window.getEndMillis(),
+        DetectionUtils.getMonitoringGranularityPeriod(monitoringGranularity, datasetConfig), datasetConfig);
+
+    return result;
+  }
+
+  /**
+   * Break a time period into parts and fetch it one by one, and put it together into a DataFrame
+   *
+   * @param metricEntity metric entity
+   * @param start start timestamp
+   * @param end end timestamp
+   * @return Data Frame that has data from start to end
+   */
+  private DataFrame fetchDataByPart(MetricEntity metricEntity, long start, long end) {
+    long duration = end - start;
+    List<MetricSlice> slices = new ArrayList<>();
+    long temp = start;
+    int parts = (int) (duration / PARTITION_PERIOD);
+    for (int i = 0; i < parts; i++) {
+      MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), temp, temp + PARTITION_PERIOD,
+          metricEntity.getFilters(), timeGranularity);
+      temp += PARTITION_PERIOD;
+      slices.add(sliceData);
+    }
+    MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), temp, end,
+        metricEntity.getFilters(), timeGranularity);
+    slices.add(sliceData);
+
+    InputData data = this.dataFetcher.fetchData(new InputDataSpec().withTimeseriesSlices(slices)
+        .withMetricIdsForDataset(Collections.singletonList(metricEntity.getId())));
+
+    LongSeries longSeries = LongSeries.buildFrom();
+    DoubleSeries doubleSeries = DoubleSeries.buildFrom();
+
+    DataFrame df = new DataFrame();
+    df.addSeries(COL_TIME, longSeries);
+    df.addSeries(COL_VALUE, doubleSeries);
+    df.setIndex(COL_TIME);
+
+    Set<Map.Entry<MetricSlice, DataFrame>> set = data.getTimeseries().entrySet();
+    for (Map.Entry<MetricSlice, DataFrame> entry: set) {
+      df = df.append(entry.getValue());
+    }
+    df = df.sortedBy(COL_TIME);
+
+    // Remove duplicates
+    long prev = -1;
+    for (int i = 0; i < df.size(); i++) {
+      long curr = df.getLongs(COL_TIME).get(i);
+      if (curr == prev) {
+        DataFrame df1 = df.slice(0, i);
+        DataFrame df2 = df.slice(i+1, df.size());
+        df = df1.append(df2);
+      }
+      prev = curr;
+    }
+    return df;
+  }
+
+  /**
+   * Returns a data frame containing the same time daily data
+   * @param originalDF the original dataframe
+   * @param time the time of day
+   * @return dataframe containing same time of daily data for LOOKBACK number of days
+   */
+  private DataFrame getDailyDF(DataFrame originalDF, Long time) {
+    LongSeries longSeries = (LongSeries) originalDF.get(COL_TIME);
+    Long start = longSeries.getLong(0);
+    DateTime dt = new DateTime(time);
+    DataFrame df = DataFrame.builder(COL_TIME, COL_VALUE).build();
+
+    for (int i = 0; i < LOOKBACK; i++) {
+      DateTime subDt = new DateTime(dt);
+      subDt = subDt.minusDays(1);
+      long t = subDt.getMillis();
 
 Review comment:
   I had to do subDt = subDt.minusDays(1), because subDt.minusDays(1) does not mutate subDt

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org