You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/04/19 21:34:41 UTC

[incubator-pinot] branch master updated: Add upper lower bounds (#4147)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 937ecb4  Add upper lower bounds (#4147)
937ecb4 is described below

commit 937ecb43b7fac4fda5fe104961085660aac1553c
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Fri Apr 19 14:34:36 2019 -0700

    Add upper lower bounds (#4147)
    
    * [TE] Add upper/lower bounds into TimeSeries
    
    * [TE] Add test for upper/lower bounds
    
    * [TE] Add current value into TimeSeries
---
 .../thirdeye/dataframe/util/DataFrameUtils.java    |  3 ++
 .../thirdeye/detection/spi/model/TimeSeries.java   | 48 +++++++++++++++++-----
 .../wrapper/BaselineFillingMergeWrapperTest.java   | 17 +++++++-
 3 files changed, 56 insertions(+), 12 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/DataFrameUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/DataFrameUtils.java
index a96730a..3a66a79 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/DataFrameUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/DataFrameUtils.java
@@ -62,6 +62,9 @@ import org.joda.time.PeriodType;
 public class DataFrameUtils {
   public static final String COL_TIME = "timestamp";
   public static final String COL_VALUE = "value";
+  public static final String COL_CURRENT = "current";
+  public static final String COL_UPPER_BOUND = "upper_bound";
+  public static final String COL_LOWER_BOUND = "lower_bound";
 
   /**
    * Returns a Thirdeye response parsed as a DataFrame. The method stores the time values in
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
index a32c693..755b880 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
@@ -37,33 +37,61 @@ public class TimeSeries {
     this.df = new DataFrame();
   }
 
-  /**
-   * Add the time stamps into the timeseries
-   * @param timestamps
-   */
-  public void addTimeStamps(LongSeries timestamps) {
+  public TimeSeries(LongSeries timestamps, DoubleSeries baselineValues) {
     this.df.addSeries(COL_TIME, timestamps).setIndex(COL_TIME);
+    this.df.addSeries(DataFrameUtils.COL_VALUE, baselineValues);
+  }
+
+  public TimeSeries(LongSeries timestamps, DoubleSeries baselineValues, DoubleSeries currentValues,
+      DoubleSeries upperBoundValues, DoubleSeries lowerBoundValues) {
+    this(timestamps, baselineValues);
+    this.df.addSeries(DataFrameUtils.COL_CURRENT, currentValues);
+    this.df.addSeries(DataFrameUtils.COL_UPPER_BOUND, upperBoundValues);
+    this.df.addSeries(DataFrameUtils.COL_LOWER_BOUND, lowerBoundValues);
   }
 
   /**
-   * Add the predicted baseline into the timeseries
-   * @param baselineValues predicted baseline values
+   * Add the series into TimeSeries if it exists in the DataFrame.
+   * @param df The source DataFrame.
+   * @param name The series name.
    */
-  public void addPredictedBaseline(DoubleSeries baselineValues) {
-    this.df.addSeries(DataFrameUtils.COL_VALUE, baselineValues);
+  private static void addSeries(TimeSeries ts, DataFrame df, String name) {
+    if (df.contains(name)) {
+      ts.df.addSeries(name, df.get(name));
+    }
   }
 
+  /**
+   * Add DataFrame into TimeSeries.
+   * @param df The source DataFrame.
+   * @return TimeSeries that contains the predicted values.
+   */
   public static TimeSeries fromDataFrame(DataFrame df) {
     TimeSeries ts = new TimeSeries();
     ts.df.addSeries(COL_TIME, df.get(COL_TIME)).setIndex(COL_TIME);
-    ts.df.addSeries(DataFrameUtils.COL_VALUE, df.get(DataFrameUtils.COL_VALUE));
+    addSeries(ts, df, COL_VALUE);
+    addSeries(ts, df, COL_CURRENT);
+    addSeries(ts, df, COL_UPPER_BOUND);
+    addSeries(ts, df, COL_LOWER_BOUND);
     return ts;
   }
 
+  public DoubleSeries getCurrent() {
+    return this.df.getDoubles(DataFrameUtils.COL_CURRENT);
+  }
+
   public DoubleSeries getPredictedBaseline() {
     return this.df.getDoubles(DataFrameUtils.COL_VALUE);
   }
 
+  public DoubleSeries getPredictedUpperBound() {
+    return this.df.getDoubles(DataFrameUtils.COL_UPPER_BOUND);
+  }
+
+  public DoubleSeries getPredictedLowerBound() {
+    return this.df.getDoubles(DataFrameUtils.COL_LOWER_BOUND);
+  }
+
   public DataFrame getDataFrame() {
     return df;
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 0b4c0d8..05801cb 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -125,7 +125,8 @@ public class BaselineFillingMergeWrapperTest {
     MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
     spec.setBaselineAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
     spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(
-        DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
+        DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE" , COL_UPPER_BOUND + ":DOUBLE", COL_LOWER_BOUND + ":DOUBLE")
+            .append(3000, 100, 200, 50).build())));
     InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, this.config.getId());
     baselineProvider.init(spec, dataFetcher);
     this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));
@@ -138,9 +139,15 @@ public class BaselineFillingMergeWrapperTest {
     Assert.assertEquals(anomalyResults.size(), 1);
     Assert.assertTrue(anomalyResults.contains(anomaly));
     Assert.assertEquals(anomalyResults.get(0).getAvgBaselineVal(), 100.0);
+    Assert.assertEquals(anomalyResults.get(0).getAvgBaselineVal(), 100.0);
     Assert.assertEquals(anomalyResults.get(0).getAvgCurrentVal(), 100.0);
     Assert.assertEquals(anomalyResults.get(0).getProperties().get("detectorComponentName"), "testDetector");
     Assert.assertEquals(anomalyResults.get(0).getProperties().get("baselineProviderComponentName"), "baseline");
+
+    TimeSeries ts = baselineProvider.computePredictedTimeSeries(MetricSlice.from(1, 3000, 3600));
+    Assert.assertEquals(ts.getPredictedBaseline().get(0), 100.0);
+    Assert.assertEquals(ts.getPredictedUpperBound().get(0), 200.0);
+    Assert.assertEquals(ts.getPredictedLowerBound().get(0), 50.0);
   }
 
   @Test
@@ -165,7 +172,8 @@ public class BaselineFillingMergeWrapperTest {
     MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
     spec.setBaselineAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
     spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(
-        DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
+        DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE", COL_UPPER_BOUND + ":DOUBLE", COL_LOWER_BOUND + ":DOUBLE")
+            .append(3000, 100, 200, 50).build())));
     InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, this.config.getId());
     baselineProvider.init(spec, dataFetcher);
     this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));
@@ -179,5 +187,10 @@ public class BaselineFillingMergeWrapperTest {
     Assert.assertEquals(anomalyResults.get(0).getAvgCurrentVal(), 999.0);
     Assert.assertEquals(anomalyResults.get(0).getProperties().get("detectorComponentName"), "testDetector");
     Assert.assertEquals(anomalyResults.get(0).getProperties().get("baselineProviderComponentName"), "someBaseline");
+
+    TimeSeries ts = baselineProvider.computePredictedTimeSeries(MetricSlice.from(1, 3000, 3600));
+    Assert.assertEquals(ts.getPredictedBaseline().get(0), 100.0);
+    Assert.assertEquals(ts.getPredictedUpperBound().get(0), 200.0);
+    Assert.assertEquals(ts.getPredictedLowerBound().get(0), 50.0);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org