You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2018/11/21 18:53:20 UTC

[incubator-pinot] branch master updated: [TE] Anomaly function to new pipeline YAML converter (#3537)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4162fb3  [TE] Anomaly function to new pipeline YAML converter (#3537)
4162fb3 is described below

commit 4162fb3264b4e3f1195e3f522c4f39604cad09ce
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Wed Nov 21 10:53:15 2018 -0800

    [TE] Anomaly function to new pipeline YAML converter (#3537)
    
    - The endpoint to convert a legacy anomaly function to new pipeline YAML
    - New dimension wrapper to support legacy data filter minLiveZone
---
 .../detection/DetectionMigrationResource.java      | 162 ++++++++++++++++++++-
 .../detection/algorithm/DimensionWrapper.java      |  63 ++++++--
 .../detection/annotation/DetectionRegistry.java    |   2 +-
 .../thirdeye/detection/spi/components/Tunable.java |   2 +-
 .../detection/wrapper/AnomalyDetectorWrapper.java  |  10 +-
 .../wrapper/BaselineFillingMergeWrapper.java       |   9 +-
 .../yaml/CompositePipelineConfigTranslator.java    | 107 +++++++++++---
 .../yaml/YamlDetectionConfigTranslator.java        |   2 +-
 .../thirdeye/detection/yaml/YamlResource.java      |   5 +
 .../CompositePipelineConfigTranslatorTest.java     |   3 +-
 .../compositePipelineTranslatorTestResult-1.json   |  59 +++++---
 .../compositePipelineTranslatorTestResult-2.json   |  15 +-
 .../thirdeye/detection/yaml/pipeline-config-1.yaml |   8 +
 13 files changed, 370 insertions(+), 77 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index f49e810..28e30f2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -16,6 +16,8 @@
 
 package com.linkedin.thirdeye.detection;
 
+import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
 import com.linkedin.thirdeye.datalayer.bao.AnomalyFunctionManager;
 import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.MetricConfigManager;
@@ -23,14 +25,27 @@ import com.linkedin.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
+import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+
+import static com.linkedin.thirdeye.anomaly.merge.AnomalyMergeStrategy.*;
 
 
 /**
@@ -39,22 +54,159 @@ import org.slf4j.LoggerFactory;
 @Path("/migrate")
 public class DetectionMigrationResource {
   private static final Logger LOGGER = LoggerFactory.getLogger(DetectionMigrationResource.class);
+  private static final String PROP_WINDOW_DELAY = "windowDelay";
+  private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
 
   private final LegacyAnomalyFunctionTranslator translator;
   private final AnomalyFunctionManager anomalyFunctionDAO;
   private final DetectionConfigManager detectionConfigDAO;
+  private final Yaml yaml;
 
   /**
    * Instantiates a new Detection migration resource.
    *
    * @param anomalyFunctionFactory the anomaly function factory
    */
-  public DetectionMigrationResource(MetricConfigManager metricConfigDAO,
-      AnomalyFunctionManager anomalyFunctionDAO, DetectionConfigManager detectionConfigDAO,
-      AnomalyFunctionFactory anomalyFunctionFactory, AlertFilterFactory alertFilterFactory) {
+  public DetectionMigrationResource(MetricConfigManager metricConfigDAO, AnomalyFunctionManager anomalyFunctionDAO,
+      DetectionConfigManager detectionConfigDAO, AnomalyFunctionFactory anomalyFunctionFactory,
+      AlertFilterFactory alertFilterFactory) {
     this.anomalyFunctionDAO = anomalyFunctionDAO;
     this.detectionConfigDAO = detectionConfigDAO;
     this.translator = new LegacyAnomalyFunctionTranslator(metricConfigDAO, anomalyFunctionFactory, alertFilterFactory);
+    DumperOptions options = new DumperOptions();
+    options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
+    options.setPrettyFlow(true);
+    this.yaml = new Yaml(options);
+  }
+
+  /**
+   * Endpoint to convert a existing anomaly function to a composite pipeline yaml
+   *
+   * @param anomalyFunctionId the anomaly function id
+   * @return the yaml config as string
+   */
+  @GET
+  public String migrateToYaml(@QueryParam("id") long anomalyFunctionId) throws Exception {
+    AnomalyFunctionDTO anomalyFunctionDTO = this.anomalyFunctionDAO.findById(anomalyFunctionId);
+    Map<String, Object> yamlConfigs = new LinkedHashMap<>();
+    yamlConfigs.put("detectionName", anomalyFunctionDTO.getFunctionName());
+    yamlConfigs.put("metric", anomalyFunctionDTO.getMetric());
+    yamlConfigs.put("dataset", anomalyFunctionDTO.getCollection());
+    yamlConfigs.put("pipelineType", "Composite");
+    if (anomalyFunctionDTO.getExploreDimensions() != null) {
+      // dimension explore and data filter
+      yamlConfigs.put("dimensionExploration",
+          getDimensionExplorationParams(anomalyFunctionDTO));
+    }
+    if (anomalyFunctionDTO.getFilters() != null){
+      yamlConfigs.put("filters",
+          AnomalyDetectionInputContextBuilder.getFiltersForFunction(anomalyFunctionDTO.getFilters()).asMap());
+    }
+
+    Map<String, Object> ruleYaml = new LinkedHashMap<>();
+    ruleYaml.put("name", "myRule");
+
+    // detection
+    ruleYaml.put("detection", Collections.singletonList(
+        ImmutableMap.of("type", "ALGORITHM", "params", getAlgorithmDetectorParams(anomalyFunctionDTO))));
+
+    // filters
+    Map<String, String> alertFilter = anomalyFunctionDTO.getAlertFilter();
+
+    if (alertFilter != null && !alertFilter.isEmpty()){
+      Map<String, Object> filterYaml = new LinkedHashMap<>();
+      if (!alertFilter.containsKey("thresholdField")) {
+        // algorithm alert filter
+        filterYaml = ImmutableMap.of("type", "ALGORITHM_FILTER", "params", getAlertFilterParams(anomalyFunctionDTO));
+      } else {
+        // threshold filter migrate to rule filters
+        // site wide impact filter migrate to rule based swi filter
+        if (anomalyFunctionDTO.getAlertFilter().get("thresholdField").equals("impactToGlobal")){
+          filterYaml.put("type", "SITEWIDE_IMPACT_FILTER");
+          filterYaml.put("params", getSiteWideImpactFilterParams(anomalyFunctionDTO));
+        }
+        // weight filter migrate to rule based percentage change filter
+        if (anomalyFunctionDTO.getAlertFilter().get("thresholdField").equals("weight")){
+          filterYaml.put("type", "PERCENTAGE_CHANGE_FILTER");
+          filterYaml.put("params", getPercentageChangeFilterParams(anomalyFunctionDTO));
+        }
+      }
+      ruleYaml.put("filter", Collections.singletonList(filterYaml));
+    }
+
+    yamlConfigs.put("rules", Collections.singletonList(ruleYaml));
+
+    // merger configs
+    if (anomalyFunctionDTO.getAnomalyMergeConfig() != null ) {
+      Map<String, Object> mergerYaml = new LinkedHashMap<>();
+      if (anomalyFunctionDTO.getAnomalyMergeConfig().getMergeStrategy() == FUNCTION_DIMENSIONS){
+        mergerYaml.put("maxGap", anomalyFunctionDTO.getAnomalyMergeConfig().getSequentialAllowedGap());
+        mergerYaml.put("maxDuration", anomalyFunctionDTO.getAnomalyMergeConfig().getMaxMergeDurationLength());
+      }
+      yamlConfigs.put("merger", mergerYaml);
+    }
+
+    return this.yaml.dump(yamlConfigs);
+  }
+
+  private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO functionDTO) {
+    Map<String, Object> dimensionExploreYaml = new LinkedHashMap<>();
+    dimensionExploreYaml.put("dimensions", Collections.singletonList(functionDTO.getExploreDimensions()));
+    if (functionDTO.getDataFilter() != null && !functionDTO.getDataFilter().isEmpty() && functionDTO.getDataFilter().get("type").equals("average_threshold")) {
+      // migrate average threshold data filter
+      dimensionExploreYaml.put("dimensionFilterMetric", functionDTO.getDataFilter().get("metricName"));
+      dimensionExploreYaml.put("minValue", Double.valueOf(functionDTO.getDataFilter().get("threshold")));
+      dimensionExploreYaml.put("minLiveZone", functionDTO.getDataFilter().get("minLiveZone"));
+    }
+    return dimensionExploreYaml;
+  }
+
+  private Map<String, Object> getPercentageChangeFilterParams(AnomalyFunctionDTO functionDTO) {
+    Map<String, Object> filterYamlParams = new LinkedHashMap<>();
+    filterYamlParams.put("threshold", Math.abs(Double.valueOf(functionDTO.getAlertFilter().get("maxThreshold"))));
+    filterYamlParams.put("pattern", "up_or_down");
+    return filterYamlParams;
+  }
+
+  private Map<String, Object> getSiteWideImpactFilterParams(AnomalyFunctionDTO functionDTO) {
+    Map<String, Object> filterYamlParams = new LinkedHashMap<>();
+    filterYamlParams.put("threshold", Math.abs(Double.valueOf(functionDTO.getAlertFilter().get("maxThreshold"))));
+    filterYamlParams.put("pattern", "up_or_down");
+    filterYamlParams.put("sitewideMetricName", functionDTO.getGlobalMetric());
+    filterYamlParams.put("sitewideCollection", functionDTO.getCollection());
+    filterYamlParams.put("filters", AnomalyDetectionInputContextBuilder.getFiltersForFunction(functionDTO.getGlobalMetricFilters()).asMap());
+    return filterYamlParams;
+  }
+
+  private Map<String, Object> getAlertFilterParams(AnomalyFunctionDTO functionDTO) {
+    Map<String, Object> filterYamlParams = new LinkedHashMap<>();
+    Map<String, Object> params = new HashMap<>();
+    filterYamlParams.put("configuration", params);
+    params.putAll(functionDTO.getAlertFilter());
+    params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
+    // TODO  timezone
+    return filterYamlParams;
+  }
+
+  private String getBucketPeriod(AnomalyFunctionDTO functionDTO) {
+    return new Period(TimeUnit.MILLISECONDS.convert(functionDTO.getBucketSize(), functionDTO.getBucketUnit())).toString();
+  }
+
+  private Map<String, Object> getAlgorithmDetectorParams(AnomalyFunctionDTO functionDTO) throws Exception {
+    Map<String, Object> detectorYaml = new LinkedHashMap<>();
+    Map<String, Object> params = new LinkedHashMap<>();
+    detectorYaml.put("configuration", params);
+    Properties properties = AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+    for (Map.Entry<Object, Object> property : properties.entrySet()) {
+      params.put((String) property.getKey(), property.getValue());
+    }
+    params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
+    // TODO  timezone
+    if (functionDTO.getWindowDelay() != 0) {
+      detectorYaml.put(PROP_WINDOW_DELAY, functionDTO.getWindowDelay());
+      detectorYaml.put(PROP_WINDOW_DELAY_UNIT, functionDTO.getWindowDelayUnit().toString());
+    }
+    return detectorYaml;
   }
 
   /**
@@ -66,9 +218,7 @@ public class DetectionMigrationResource {
    * @throws Exception the exception
    */
   @POST
-  public Response migrateToDetectionPipeline(
-      @QueryParam("id") long anomalyFunctionId,
-      @QueryParam("name") String name,
+  public Response migrateToDetectionPipeline(@QueryParam("id") long anomalyFunctionId, @QueryParam("name") String name,
       @QueryParam("lastTimestamp") Long lastTimestamp) throws Exception {
     AnomalyFunctionDTO anomalyFunctionDTO = this.anomalyFunctionDAO.findById(anomalyFunctionId);
     DetectionConfigDTO config = this.translator.translate(anomalyFunctionDTO);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/DimensionWrapper.java
index 28ed691..4d2b2a6 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/DimensionWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -35,10 +35,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.collections.MapUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
 
@@ -49,6 +52,7 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
  * each filtered time series.
  */
 public class DimensionWrapper extends DetectionPipeline {
+  private static final Logger LOG = LoggerFactory.getLogger(DimensionWrapper.class);
 
   // prototyping
   private static final String PROP_NESTED = "nested";
@@ -66,8 +70,12 @@ public class DimensionWrapper extends DetectionPipeline {
   private final double minValue;
   private final double minValueHourly;
   private final double minValueDaily;
+  private final double minLiveZone;
+  private final double liveBucketPercentageThreshold;
   private final Period lookback;
   private final DateTimeZone timezone;
+  private DateTime start;
+  private DateTime end;
 
   protected final String nestedMetricUrnKey;
   protected final List<String> dimensions;
@@ -77,7 +85,7 @@ public class DimensionWrapper extends DetectionPipeline {
   public DimensionWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
     super(provider, config, startTime, endTime);
 
-    // exploration
+    // the metric used in dimension exploration
     this.metricUrn = MapUtils.getString(config.getProperties(), "metricUrn", null);
     this.minContribution = MapUtils.getDoubleValue(config.getProperties(), "minContribution", Double.NaN);
     this.minValue = MapUtils.getDoubleValue(config.getProperties(), "minValue", Double.NaN);
@@ -88,10 +96,26 @@ public class DimensionWrapper extends DetectionPipeline {
     this.lookback = ConfigUtils.parsePeriod(MapUtils.getString(config.getProperties(), "lookback", "1w"));
     this.timezone = DateTimeZone.forID(MapUtils.getString(config.getProperties(), "timezone", "America/Los_Angeles"));
 
-    // prototyping
+    /*
+     * A bucket of the time series is taken into consider only if its value is above the minLiveZone. In other words,
+     * if a bucket's value is smaller than minLiveZone, then this bucket is ignored when calculating the average value.
+     * Used for outlier removal. Replace legacy average threshold filter.
+     */
+    this.minLiveZone = MapUtils.getDoubleValue(config.getProperties(), "minLiveZone", Double.NaN);
+    this.liveBucketPercentageThreshold = MapUtils.getDoubleValue(config.getProperties(), "liveBucketPercentageThreshold", 0.5);
+
+    // the metric to run the detection for
     this.nestedMetricUrns = ConfigUtils.getList(config.getProperties().get(PROP_NESTED_METRIC_URNS), Collections.singletonList(this.metricUrn));
     this.nestedMetricUrnKey = MapUtils.getString(config.getProperties(), PROP_NESTED_METRIC_URN_KEY, PROP_NESTED_METRIC_URN_KEY_DEFAULT);
     this.nestedProperties = ConfigUtils.getList(config.getProperties().get(PROP_NESTED));
+
+    this.start = new DateTime(this.startTime, this.timezone);
+    this.end = new DateTime(this.endTime, this.timezone);
+
+    DateTime minStart = this.end.minus(this.lookback);
+    if (minStart.isBefore(this.start)) {
+      this.start = minStart;
+    }
   }
 
   @Override
@@ -100,19 +124,10 @@ public class DimensionWrapper extends DetectionPipeline {
 
     if (this.metricUrn != null) {
       // metric and dimension exploration
-
-      DateTime start = new DateTime(this.startTime, this.timezone);
-      DateTime end = new DateTime(this.endTime, this.timezone);
-
-      DateTime minStart = end.minus(this.lookback);
-      if (minStart.isBefore(start)) {
-        start = minStart;
-      }
-
-      Period testPeriod = new Period(start, end);
+      Period testPeriod = new Period(this.start, this.end);
 
       MetricEntity metric = MetricEntity.fromURN(this.metricUrn);
-      MetricSlice slice = MetricSlice.from(metric.getId(), start.getMillis(), end.getMillis(), metric.getFilters());
+      MetricSlice slice = MetricSlice.from(metric.getId(), this.start.getMillis(), this.end.getMillis(), metric.getFilters());
 
       DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions).get(slice);
 
@@ -128,7 +143,8 @@ public class DimensionWrapper extends DetectionPipeline {
       }
 
       // min value
-      if (!Double.isNaN(this.minValue)) {
+      // check min value if only min live zone not set, other wise use checkMinLiveZone below
+      if (!Double.isNaN(this.minValue) && Double.isNaN(this.minLiveZone)) {
         aggregates = aggregates.filter(aggregates.getDoubles(COL_VALUE).gte(this.minValue)).dropNull();
       }
 
@@ -168,11 +184,18 @@ public class DimensionWrapper extends DetectionPipeline {
       }
     }
 
+    if (!Double.isNaN(this.minLiveZone) && !Double.isNaN(this.minValue)) {
+      // filters all nested metric that didn't pass live zone check
+      nestedMetrics = nestedMetrics.stream().filter(metricEntity -> checkMinLiveZone(metricEntity)).collect(Collectors.toList());
+    }
+
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
     Map<String, Object> diagnostics = new HashMap<>();
 
+    LOG.info("exploring {} metrics", nestedMetrics.size());
     for (MetricEntity metric : nestedMetrics) {
       for (Map<String, Object> properties : this.nestedProperties) {
+        LOG.info("running detection for {}", metric.toString());
         DetectionPipelineResult intermediate = this.runNested(metric, properties);
 
         anomalies.addAll(intermediate.getAnomalies());
@@ -184,6 +207,18 @@ public class DimensionWrapper extends DetectionPipeline {
         .setDiagnostics(diagnostics);
   }
 
+  private boolean checkMinLiveZone(MetricEntity me) {
+    MetricSlice slice = MetricSlice.from(me.getId(), this.start.getMillis(), this.end.getMillis(), me.getFilters());
+    DataFrame df = this.provider.fetchTimeseries(Collections.singleton(slice)).get(slice);
+    long totalBuckets = df.size();
+    df = df.filter(df.getDoubles(COL_VALUE).gt(this.minLiveZone)).dropNull();
+    double liveBucketPercentage = (double) df.size() / (double) totalBuckets;
+    if (liveBucketPercentage >= this.liveBucketPercentageThreshold) {
+      return df.getDoubles(COL_VALUE).mean().getDouble(0)>= this.minValue;
+    }
+    return false;
+  }
+
   protected DetectionPipelineResult runNested(MetricEntity metric, Map<String, Object> template) throws Exception {
     Preconditions.checkArgument(template.containsKey(PROP_CLASS_NAME), "Nested missing " + PROP_CLASS_NAME);
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionRegistry.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionRegistry.java
index f9ccf16..7741eb1 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionRegistry.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionRegistry.java
@@ -112,7 +112,7 @@ public class DetectionRegistry {
    */
   public String lookupTunable(String className) {
     Preconditions.checkArgument(TUNE_MAP.containsKey(className), className + " not found in registry");
-    return TUNE_MAP.get(className).tunable();
+    return this.lookup(TUNE_MAP.get(className).tunable());
   }
 
   /**
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
index e797fb3..c8e6e48 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
@@ -30,7 +30,7 @@ public interface Tunable<T extends AbstractSpec> extends BaseComponent<T> {
   /**
    * Returns the new spec for the component it's tuning
    * @param currentSpec current spec for the component. empty if not exist
-   * @return input data spec
+   * @return the init spec for the component it's tuning
    */
   Map<String, Object> tune(Map<String, Object> currentSpec, Interval trainingWindow);
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 45d8270..e8e938f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -61,6 +61,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private static final String PROP_WINDOW_UNIT = "windowUnit";
   private static final String PROP_FREQUENCY = "frequency";
   private static final String PROP_DETECTOR = "detector";
+  private static final String PROP_DETECTOR_COMPONENT_KEY = "detectorComponentKey";
+
   private static final Logger LOG = LoggerFactory.getLogger(
       AnomalyDetectorWrapper.class);
 
@@ -76,6 +78,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private DateTimeZone dateTimeZone;
   // need to specify run frequency for minute level detection. Used for moving monitoring window alignment, default to be 15 minutes.
   private final TimeGranularity functionFrequency;
+  private final String detectorReferenceKey;
 
   public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
     super(provider, config, startTime, endTime);
@@ -83,9 +86,9 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     this.metricUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
 
     Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_DETECTOR));
-    String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_DETECTOR));
-    Preconditions.checkArgument(this.config.getComponents().containsKey(detectorReferenceKey));
-    this.anomalyDetector = (AnomalyDetector) this.config.getComponents().get(detectorReferenceKey);
+    this.detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_DETECTOR));
+    Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorReferenceKey));
+    this.anomalyDetector = (AnomalyDetector) this.config.getComponents().get(this.detectorReferenceKey);
 
     this.isMovingWindowDetection = MapUtils.getBooleanValue(config.getProperties(), PROP_MOVING_WINDOW_DETECTION, false);
     // delays to wait for data becomes available
@@ -115,6 +118,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
       anomaly.setMetric(metric.getName());
       anomaly.setCollection(metric.getDataset());
       anomaly.setDimensions(DetectionUtils.toFilterMap(me.getFilters()));
+      anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_KEY, this.detectorReferenceKey);
     }
     return new DetectionPipelineResult(anomalies);
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index 2547a99..d743971 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -49,19 +49,21 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
   private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
   private static final String PROP_CURRENT_PROVIDER = "currentValueProvider";
   private static final String PROP_METRIC_URN = "metricUrn";
+  private static final String PROP_BASELINE_PROVIDER_COMPONENT_KEY = "baselineProviderComponentKey";
 
   private BaselineProvider baselineValueProvider; // optionally configure a baseline value loader
   private BaselineProvider currentValueProvider;
   private Series.DoubleFunction aggregationFunction;
+  private String baselineProviderComponentKey;
 
   public BaselineFillingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
   {
     super(provider, config, startTime, endTime);
 
     if (config.getProperties().containsKey(PROP_BASELINE_PROVIDER)) {
-      String referenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_BASELINE_PROVIDER));
-      Preconditions.checkArgument(this.config.getComponents().containsKey(referenceKey));
-      this.baselineValueProvider = (BaselineProvider) this.config.getComponents().get(referenceKey);
+      this.baselineProviderComponentKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_BASELINE_PROVIDER));
+      Preconditions.checkArgument(this.config.getComponents().containsKey(this.baselineProviderComponentKey));
+      this.baselineValueProvider = (BaselineProvider) this.config.getComponents().get(this.baselineProviderComponentKey);
     }
     if (config.getProperties().containsKey(PROP_CURRENT_PROVIDER)) {
       String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), currentValueProvider));
@@ -104,6 +106,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
         anomaly.setAvgCurrentVal(this.currentValueProvider.computePredictedAggregates(slice, aggregationFunction));
         if (this.baselineValueProvider != null) {
           anomaly.setAvgBaselineVal(this.baselineValueProvider.computePredictedAggregates(slice, aggregationFunction));
+          anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_KEY, this.baselineProviderComponentKey);
         }
       } catch (Exception e) {
         // ignore
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 4094ba9..45ecf73 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -3,7 +3,9 @@ package com.linkedin.thirdeye.detection.yaml;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
+import com.linkedin.thirdeye.api.TimeGranularity;
 import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.ConfigUtils;
@@ -29,6 +31,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.MapUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
@@ -121,19 +124,31 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
   private static final String PROP_CLASS_NAME = "className";
   private static final String PROP_PARAMS = "params";
   private static final String PROP_METRIC_URN = "metricUrn";
+  private static final String PROP_DIMENSION_FILTER_METRIC = "dimensionFilterMetric";
+  private static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
   private static final String PROP_RULES = "rules";
   private static final String PROP_NESTED = "nested";
   private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
   private static final String PROP_NAME = "name";
   private static final String PROP_DETECTOR = "detector";
+  private static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
+  private static final String PROP_WINDOW_DELAY = "windowDelay";
+  private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+  private static final String PROP_WINDOW_SIZE = "windowSize";
+  private static final String PROP_WINDOW_UNIT = "windowUnit";
+  private static final String PROP_FREQUENCY = "frequency";
+  private static final String PROP_MERGER = "merger";
 
   private static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
-  private static final Map<String, String> DETECTOR_TO_BASELINE = ImmutableMap.of();
+  private static final Map<String, String> DETECTOR_TO_BASELINE = ImmutableMap.of("ALGORITHM", "ALGORITHM_BASELINE");
+  private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM");
 
   private final Map<String, Object> components = new HashMap<>();
   private MetricConfigDTO metricConfig;
   private DatasetConfigDTO datasetConfig;
   private String metricUrn;
+  private Map<String, Object> mergerProperties = new HashMap<>();
+  private Map<String, Collection<String>> filterMaps;
 
   public CompositePipelineConfigTranslator(Map<String, Object> yamlConfig, DataProvider provider) {
     super(yamlConfig, provider);
@@ -149,7 +164,11 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
         .get(metricConfig.getDataset());
     Preconditions.checkNotNull(this.datasetConfig, "dataset not found");
 
-    this.metricUrn = buildMetricUrn(yamlConfig);
+    // if user set merger properties
+    this.mergerProperties = MapUtils.getMap(yamlConfig, PROP_MERGER, new HashMap());
+
+    Map<String, Collection<String>> filterMaps = MapUtils.getMap(yamlConfig, PROP_FILTERS);
+    this.metricUrn = buildMetricUrn(filterMaps, this.metricConfig.getId());
     String cron = buildCron();
 
     List<Map<String, Object>> ruleYamls = getList(yamlConfig.get(PROP_RULES));
@@ -170,15 +189,29 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
         nestedPipelines.addAll(filterNestedProperties);
       }
     }
-    Map<String, Object> dimensionWrapperProperties = new HashMap<>();
-    dimensionWrapperProperties.putAll(MapUtils.getMap(yamlConfig, PROP_DIMENSION_EXPLORATION));
-    dimensionWrapperProperties.put(PROP_METRIC_URN, metricUrn);
+    Map<String, Object> dimensionWrapperProperties = buildDimensionWrapperProperties(filterMaps);
     Map<String, Object> properties = buildWrapperProperties(ChildKeepingMergeWrapper.class.getName(),
         Collections.singletonList(
-            buildWrapperProperties(DimensionWrapper.class.getName(), nestedPipelines, dimensionWrapperProperties)));
+            buildWrapperProperties(DimensionWrapper.class.getName(), nestedPipelines, dimensionWrapperProperties)), this.mergerProperties);
     return new YamlTranslationResult().withProperties(properties).withComponents(this.components).withCron(cron);
   }
 
+  private Map<String, Object> buildDimensionWrapperProperties(Map<String, Collection<String>> filterMaps) {
+    Map<String, Object> dimensionWrapperProperties = new HashMap<>();
+    dimensionWrapperProperties.put(PROP_NESTED_METRIC_URNS, Collections.singletonList(this.metricUrn));
+    if (yamlConfig.containsKey(PROP_DIMENSION_EXPLORATION)) {
+      Map<String, Object> dimensionExploreYaml = MapUtils.getMap(this.yamlConfig, PROP_DIMENSION_EXPLORATION);
+      dimensionWrapperProperties.putAll(dimensionExploreYaml);
+      if (dimensionExploreYaml.containsKey(PROP_DIMENSION_FILTER_METRIC)){
+        MetricConfigDTO dimensionExploreMetric = this.dataProvider.fetchMetric(MapUtils.getString(dimensionExploreYaml, PROP_DIMENSION_FILTER_METRIC), this.datasetConfig.getDataset());
+        dimensionWrapperProperties.put(PROP_METRIC_URN, buildMetricUrn(filterMaps, dimensionExploreMetric.getId()));
+      } else {
+        dimensionWrapperProperties.put(PROP_METRIC_URN, this.metricUrn);
+      }
+    }
+    return dimensionWrapperProperties;
+  }
+
   private List<Map<String, Object>> buildListOfMergeWrapperProperties(String ruleName,
       List<Map<String, Object>> yamlConfigs) {
     List<Map<String, Object>> properties = new ArrayList<>();
@@ -190,11 +223,13 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
 
   private Map<String, Object> buildMergeWrapperProperties(String ruleName, Map<String, Object> yamlConfig) {
     String detectorType = MapUtils.getString(yamlConfig, PROP_TYPE);
+    long id = MapUtils.getLong(yamlConfig, "id", 0L);
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, AnomalyDetectorWrapper.class.getName());
-    String detectorKey = makeComponentKey(ruleName, detectorType);
+    String detectorKey = makeComponentKey(ruleName, detectorType, id);
     nestedProperties.put(PROP_DETECTOR, detectorKey);
-    // TODO insert window size & unit
+
+    fillInWindowSizeAndUnit(nestedProperties, yamlConfig, detectorType);
 
     buildComponentSpec(yamlConfig, detectorType, detectorKey);
 
@@ -205,13 +240,52 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     if (DETECTOR_TO_BASELINE.containsKey(detectorType)) {
       baselineProviderType = DETECTOR_TO_BASELINE.get(detectorType);
     }
-    String baselineProviderKey = makeComponentKey(ruleName + "_" + detectorType,  baselineProviderType);
+    String baselineProviderKey = makeComponentKey(ruleName, baselineProviderType, id);
     properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
     buildComponentSpec(yamlConfig, baselineProviderType, baselineProviderKey);
-
+    properties.putAll(this.mergerProperties);
     return properties;
   }
 
+  // fill in window size and unit if detector requires this
+  private void fillInWindowSizeAndUnit(Map<String, Object> properties, Map<String, Object> yamlConfig, String detectorType) {
+    if (MOVING_WINDOW_DETECTOR_TYPES.contains(detectorType)) {
+      properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+      switch (this.datasetConfig.bucketTimeGranularity().getUnit()) {
+        case MINUTES:
+          properties.put(PROP_WINDOW_SIZE, 6);
+          properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+          properties.put(PROP_FREQUENCY, new TimeGranularity(15, TimeUnit.MINUTES));
+          break;
+        case HOURS:
+          properties.put(PROP_WINDOW_SIZE, 24);
+          properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+          break;
+        case DAYS:
+          properties.put(PROP_WINDOW_SIZE, 1);
+          properties.put(PROP_WINDOW_UNIT, TimeUnit.DAYS);
+          // TODO completeness checker true
+          break;
+        default:
+          properties.put(PROP_WINDOW_SIZE, 6);
+          properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+      }
+      // override from yaml
+      if (yamlConfig.containsKey(PROP_WINDOW_SIZE)) {
+        properties.put(PROP_WINDOW_SIZE, MapUtils.getString(yamlConfig, PROP_WINDOW_SIZE));
+      }
+      if (yamlConfig.containsKey(PROP_WINDOW_UNIT)) {
+        properties.put(PROP_WINDOW_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_UNIT));
+      }
+      if (yamlConfig.containsKey(PROP_WINDOW_DELAY)) {
+        properties.put(PROP_WINDOW_DELAY, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY));
+      }
+      if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
+        properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY_UNIT));
+      }
+    }
+  }
+
   private List<Map<String, Object>> buildFilterWrapperProperties(String wrapperClassName,
       Map<String, Object> yamlConfig, List<Map<String, Object>> nestedProperties, String ruleName) {
     if (yamlConfig == null || yamlConfig.isEmpty()) {
@@ -221,8 +295,9 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     if (wrapperProperties.isEmpty()) {
       return Collections.emptyList();
     }
+    long id = MapUtils.getLong(yamlConfig, "id", 0L);
     String filterType = MapUtils.getString(yamlConfig, PROP_TYPE);
-    String filterKey = makeComponentKey(ruleName, filterType);
+    String filterKey = makeComponentKey(ruleName, filterType, id);
     wrapperProperties.put(PROP_FILTER, filterKey);
     buildComponentSpec(yamlConfig, filterType, filterKey);
 
@@ -265,9 +340,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     }
   }
 
-  private String buildMetricUrn(Map<String, Object> yamlConfig) {
-    Map<String, Collection<String>> filterMaps = MapUtils.getMap(yamlConfig, PROP_FILTERS);
-
+  private String buildMetricUrn(Map<String, Collection<String>> filterMaps, long metricId) {
     Multimap<String, String> filters = ArrayListMultimap.create();
     if (filterMaps != null) {
       for (Map.Entry<String, Collection<String>> entry : filterMaps.entrySet()) {
@@ -275,7 +348,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
       }
     }
 
-    MetricEntity me = MetricEntity.fromMetric(1.0, this.metricConfig.getId(), filters);
+    MetricEntity me = MetricEntity.fromMetric(1.0, metricId, filters);
     return me.getUrn();
   }
 
@@ -322,8 +395,8 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     return tunable;
   }
 
-  private String makeComponentKey(String name, String type) {
-    return "$" + name + "_" + type;
+  private String makeComponentKey(String name, String type, long id) {
+    return "$" + name + ":" + type + ":" + id;
   }
 
   @Override
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
index be383bc..4e748b5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
@@ -41,7 +41,7 @@ public abstract class YamlDetectionConfigTranslator {
 
   public YamlDetectionConfigTranslator withExistingDetectionConfig(DetectionConfigDTO existingDTO) {
     this.existingConfig = existingDTO;
-    this.existingComponentSpecs = existingDTO.getComponentSpecs();
+    if(existingDTO != null) this.existingComponentSpecs = existingDTO.getComponentSpecs();
     return this;
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
index 259714b..3ab02aa 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
@@ -37,11 +37,15 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
 
 @Path("/yaml")
 public class YamlResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(YamlResource.class);
+
   private static final String PROP_NAME = "detectionName";
   private static final String PROP_TYPE = "type";
   private static final String PROP_DETECTION_CONFIG_ID = "detectionConfigIds";
@@ -110,6 +114,7 @@ public class YamlResource {
     try{
       detectionConfig = translator.withTrainingWindow(startTime, endTime).withExistingDetectionConfig(existingDetectionConfig).generateDetectionConfig();
     } catch (Exception e) {
+      LOG.error("yaml translation error", e);
       return Response.status(400).entity(ImmutableMap.of("status", "400", "message", e.getMessage())).build();
     }
     detectionConfig.setYaml(payload);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslatorTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslatorTest.java
index 06a7224..3abce8b 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslatorTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslatorTest.java
@@ -48,7 +48,7 @@ public class CompositePipelineConfigTranslatorTest {
 
 
   @Test
-  public void testBuildDetectionPropertiesMultipleRules() throws Exception {
+  public void testBuildDetectionPropertiesFull() throws Exception {
     this.yamlConfig = (Map<String, Object>) this.yaml.load(this.getClass().getResourceAsStream("pipeline-config-1.yaml"));
     CompositePipelineConfigTranslator translator = new CompositePipelineConfigTranslator(this.yamlConfig, this.provider);
     YamlTranslationResult result = translator.translateYaml();
@@ -71,7 +71,6 @@ public class CompositePipelineConfigTranslatorTest {
     this.yamlConfig.put("rules", Collections.singletonList(
         ImmutableMap.of("name", "rule2","detection", Collections.singletonList(ImmutableMap.of("change", 0.3)))));
     CompositePipelineConfigTranslator translator = new CompositePipelineConfigTranslator(this.yamlConfig, this.provider);
-
     translator.generateDetectionConfig();
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
index 33d20a0..050043c 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
@@ -1,60 +1,75 @@
 {
   "properties" : {
     "className" : "com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+    "maxGap" : 0,
     "nested" : [ {
+      "nestedMetricUrns" : [ "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3" ],
       "className" : "com.linkedin.thirdeye.detection.algorithm.DimensionWrapper",
       "metricUrn" : "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
       "nested" : [ {
-        "filter" : "$rule1_THRESHOLD_RULE_FILTER",
+        "filter" : "$rule1:THRESHOLD_RULE_FILTER:1",
         "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyFilterWrapper",
         "nested" : [ {
-          "baselineValueProvider" : "$rule1_THRESHOLD_RULE_BASELINE",
-          "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
+          "filter" : "$rule1:THRESHOLD_RULE_FILTER:0",
+          "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyFilterWrapper",
           "nested" : [ {
-            "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-            "detector" : "$rule1_THRESHOLD"
+            "baselineValueProvider" : "$rule1:RULE_BASELINE:0",
+            "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
+            "maxGap" : 0,
+            "nested" : [ {
+              "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
+              "detector" : "$rule1:THRESHOLD:0"
+            } ],
+            "maxDuration" : 100
           } ]
         } ]
       }, {
-        "filter" : "$rule2_THRESHOLD_RULE_FILTER",
+        "filter" : "$rule2:THRESHOLD_RULE_FILTER:0",
         "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyFilterWrapper",
         "nested" : [ {
-          "baselineValueProvider" : "$rule2_THRESHOLD_RULE_BASELINE",
+          "baselineValueProvider" : "$rule2:RULE_BASELINE:0",
           "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
+          "maxGap" : 0,
           "nested" : [ {
             "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-            "detector" : "$rule2_THRESHOLD"
-          } ]
+            "detector" : "$rule2:THRESHOLD:0"
+          } ],
+          "maxDuration" : 100
         } ]
       } ],
       "minContribution" : 0.05,
       "dimensions" : [ "D1", "D2" ]
-    } ]
+    } ],
+    "maxDuration" : 100
   },
   "components" : {
-    "rule1_THRESHOLD" : {
+    "rule1:RULE_BASELINE:0" : {
       "max" : 100,
-      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
+      "className" : "com.linkedin.thirdeye.detection.components.RuleBaselineProvider"
     },
-    "rule2_THRESHOLD" : {
-      "max" : 100,
-      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
+    "rule1:THRESHOLD_RULE_FILTER:0" : {
+      "min" : 50,
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleAnomalyFilter"
     },
-    "rule1_THRESHOLD_RULE_FILTER" : {
+    "rule1:THRESHOLD_RULE_FILTER:1" : {
       "min" : 50,
       "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleAnomalyFilter"
     },
-    "rule2_THRESHOLD_RULE_BASELINE" : {
+    "rule2:THRESHOLD_RULE_FILTER:0" : {
+      "min" : 50,
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleAnomalyFilter"
+    },
+    "rule2:THRESHOLD:0" : {
       "max" : 100,
-      "className" : "com.linkedin.thirdeye.detection.components.RuleBaselineProvider"
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
     },
-    "rule1_THRESHOLD_RULE_BASELINE" : {
+    "rule2:RULE_BASELINE:0" : {
       "max" : 100,
       "className" : "com.linkedin.thirdeye.detection.components.RuleBaselineProvider"
     },
-    "rule2_THRESHOLD_RULE_FILTER" : {
-      "min" : 50,
-      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleAnomalyFilter"
+    "rule1:THRESHOLD:0" : {
+      "max" : 100,
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
     }
   },
   "cron" : "0 0 14 * * ? *"
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
index 439d622..2d875a4 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
@@ -2,14 +2,15 @@
   "properties" : {
     "className" : "com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
     "nested" : [ {
+      "nestedMetricUrns" : [ "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3" ],
       "className" : "com.linkedin.thirdeye.detection.algorithm.DimensionWrapper",
       "metricUrn" : "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
       "nested" : [ {
-        "baselineValueProvider" : "$rule1_THRESHOLD_RULE_BASELINE",
+        "baselineValueProvider" : "$rule1:RULE_BASELINE:0",
         "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
         "nested" : [ {
           "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-          "detector" : "$rule1_THRESHOLD"
+          "detector" : "$rule1:THRESHOLD:0"
         } ]
       } ],
       "minContribution" : 0.05,
@@ -17,14 +18,14 @@
     } ]
   },
   "components" : {
-    "rule1_THRESHOLD" : {
+    "rule1:RULE_BASELINE:0" : {
       "max" : 100,
-      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
+      "className" : "com.linkedin.thirdeye.detection.components.RuleBaselineProvider"
     },
-    "rule1_THRESHOLD_RULE_BASELINE" : {
+    "rule1:THRESHOLD:0" : {
       "max" : 100,
-      "className" : "com.linkedin.thirdeye.detection.components.RuleBaselineProvider"
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
     }
   },
   "cron" : "0 0 14 * * ? *"
-}
\ No newline at end of file
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-1.yaml b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-1.yaml
index 8b152f8..49c6463 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-1.yaml
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-1.yaml
@@ -21,6 +21,11 @@ rules:
       max: 100
   filter:
   - type: THRESHOLD_RULE_FILTER
+    id: 0
+    params:
+      min: 50
+  - type: THRESHOLD_RULE_FILTER
+    id: 1
     params:
       min: 50
 - name: rule2
@@ -32,3 +37,6 @@ rules:
   - type: THRESHOLD_RULE_FILTER
     params:
       min: 50
+merger:
+  maxGap: 0
+  maxDuration: 100


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org