You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2018/11/12 23:10:20 UTC

[incubator-pinot] branch master updated: [TE] new detection pipeline - components (#3462)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f9317ec  [TE] new detection pipeline - components (#3462)
f9317ec is described below

commit f9317ec70613c565087ed58f6091c06f5a65b913
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Nov 12 15:10:15 2018 -0800

    [TE] new detection pipeline - components (#3462)
    
    * tuner
    
    * pipeline tuner
    
    * pipeline training
    
    * detection name
    
    * detection refactor
    
    * refactor
    
    * annotation change
    
    * reflections
    
    * move config id out of anomaly slice
    
    * yaml tuner
    
    * yaml annotation
    
    * yaml tests
    
    * comments/tests fix
    
    * tests
    
    * fix java doc
    
    * registry init
    
    * fix yaml alert test
---
 .../anomaly/ThirdEyeAnomalyApplication.java        |   4 +
 .../dashboard/ThirdEyeDashboardApplication.java    |   4 +
 .../dashboard/resources/v2/ResourceUtils.java      |   1 -
 .../resources/v2/RootCauseMetricResource.java      |   5 -
 .../thirdeye/dataframe/util/DataFrameUtils.java    |   4 -
 .../thirdeye/dataframe/util/MetricSlice.java       |   3 +-
 .../thirdeye/datalayer/dto/DetectionConfigDTO.java |  11 +
 .../datalayer/pojo/DetectionConfigBean.java        |   9 +
 .../linkedin/thirdeye/detection/DataProvider.java  |  17 +-
 .../thirdeye/detection/DefaultDataProvider.java    |  36 ++--
 .../thirdeye/detection/DetectionPipeline.java      |  59 +++++-
 .../thirdeye/detection/DetectionResource.java      |  15 +-
 ...malyDetectionStage.java => DetectionUtils.java} | 105 ++++-----
 .../thirdeye/detection/InputDataFetcher.java       |  79 +++++++
 .../detection/StaticDetectionPipeline.java         |  12 +-
 .../alert/StatefulDetectionAlertFilter.java        |   6 +-
 .../detection/alert/filter/LegacyAlertFilter.java  |   8 +-
 .../detection/algorithm/BaselineAlgorithm.java     |   4 +-
 .../detection/algorithm/DimensionWrapper.java      |   1 +
 .../algorithm/LegacyAnomalyFunctionAlgorithm.java  |   6 +-
 .../detection/algorithm/LegacyMergeWrapper.java    |   5 +-
 .../thirdeye/detection/algorithm/MergeWrapper.java |   7 +-
 .../detection/algorithm/MovingWindowAlgorithm.java |   7 +-
 .../detection/algorithm/ThresholdAlgorithm.java    |   4 +-
 .../stage/AnomalyDetectionStageWrapper.java        |   3 +-
 .../algorithm/stage/AnomalyFilterStageWrapper.java |   1 -
 .../stage/BaselineRuleDetectionStage.java          |  16 +-
 .../algorithm/stage/BaselineRuleFilterStage.java   |   4 +-
 .../detection/algorithm/stage/StageUtils.java      |  61 ------
 .../stage/StaticAnomalyDetectionStage.java         |  15 +-
 .../algorithm/stage/StaticAnomalyFilterStage.java  |  18 +-
 .../algorithm/stage/StaticGrouperStage.java        |  50 -----
 .../stage/ThresholdRuleDetectionStage.java         |  12 +-
 .../algorithm/stage/ThresholdRuleFilterStage.java  |   8 +-
 .../annotation/{Detection.java => Components.java} |  10 +-
 .../detection/annotation/DetectionRegistry.java    |  93 ++++++--
 .../annotation/{DetectionParam.java => Param.java} |   2 +-
 .../annotation/{Detection.java => Tune.java}       |  21 +-
 .../annotation/{Detection.java => Yaml.java}       |  21 +-
 .../detection/baseline/BaselineProvider.java       |  51 -----
 .../detection/baseline/RuleBaselineProvider.java   |  97 ---------
 .../detection/baseline/StaticBaselineProvider.java | 105 ---------
 .../detection/components/RuleBaselineProvider.java |  68 ++++++
 .../components/ThresholdRuleAnomalyFilter.java     |  77 +++++++
 .../components/ThresholdRuleDetector.java          |  92 ++++++++
 .../finetune/GridSearchTuningAlgorithm.java        |   6 +-
 .../detection/finetune/TuningAlgorithm.java        |   4 +-
 .../AbstractSpec.java}                             |  21 +-
 .../detection/spec/RuleBaselineProviderSpec.java   |  50 +++++
 .../spec/ThresholdRuleDetectorSpec.java}           |  26 ++-
 .../spec/ThresholdRuleFilterSpec.java}             |  26 ++-
 .../components/AnomalyDetector.java}               |  31 ++-
 .../spi/components/AnomalyFilter.java}             |  15 +-
 .../spi/components/BaseComponent.java}             |  11 +-
 .../detection/spi/components/BaselineProvider.java |  44 ++++
 .../spi/components/Grouper.java}                   |  18 +-
 .../components/Tunable.java}                       |  29 +--
 .../detection/{ => spi/model}/AnomalySlice.java    |  29 +--
 .../detection/{ => spi/model}/EventSlice.java      |   4 +-
 .../detection/{ => spi/model}/InputData.java       |  43 +++-
 .../detection/{ => spi/model}/InputDataSpec.java   |  61 +++++-
 .../thirdeye/detection/spi/model/TimeSeries.java   |  64 ++++++
 .../AnomalyDetectorWrapper.java}                   |  57 +++--
 .../AnomalyFilterWrapper.java}                     |  39 ++--
 .../BaselineFillingMergeWrapper.java               |  59 +++---
 .../ChildKeepingMergeWrapper.java                  |  10 +-
 .../yaml/CompositePipelineConfigTranslator.java    | 236 +++++++++++++++------
 .../yaml/YamlDetectionConfigTranslator.java        |  67 +++---
 .../yaml/YamlDetectionTranslatorLoader.java        |  18 +-
 .../thirdeye/detection/yaml/YamlResource.java      |  62 ++++--
 .../detection/yaml/YamlTranslationResult.java      |  84 ++++++++
 .../thirdeye/rootcause/impl/MetricEntity.java      |   1 -
 .../dashboard/views/admin/thirdeye-admin.ftl       |   4 +-
 .../thirdeye/detection/DataProviderTest.java       |  16 +-
 .../thirdeye/detection/MockDataProvider.java       |  17 +-
 .../detection/algorithm/BaselineAlgorithmTest.java |   7 +-
 .../LegacyAnomalyFunctionAlgorithmTest.java        |   1 -
 .../algorithm/MovingWindowAlgorithmTest.java       |   3 +-
 .../stage/BaselineRuleDetectionStageTest.java      |   1 -
 .../detection/baseline/MockBaselineProvider.java   |  50 -----
 .../detection/components/MockBaselineProvider.java |  44 ++++
 .../RuleBaselineProviderTest.java                  |  20 +-
 .../components/ThresholdRuleAnomalyFilterTest.java | 170 +++++++++++++++
 .../ThresholdRuleDetectorTest.java}                |  48 ++---
 .../finetune/GridSearchTuningAlgorithmTest.java    |   4 +-
 .../MergeDimensionThresholdIntegrationTest.java    |   6 +-
 .../thirdeye/detection/spec/AbstractSpecTest.java  |  61 ++++++
 .../detection/spec/MockBaselineProviderSpec.java   |  43 ++++
 .../linkedin/thirdeye/detection/spec/TestSpec.java |  60 ++++++
 .../wrapper/AnomalyDetectorWrapperTest.java        |  92 ++++++++
 .../BaselineFillingMergeWrapperTest.java           |  21 +-
 .../ChildKeepingMergeWrapperTest.java              |   4 +-
 .../CompositePipelineConfigTranslatorTest.java     | 182 ++++------------
 .../yaml/MockYamlDetectionConfigTranslator.java    |   8 +-
 .../YamlDetectionAlertConfigTranslatorTest.java    |   4 +-
 .../yaml/YamlDetectionConfigTranslatorTest.java    |   7 +-
 .../compositePipelineTranslatorTestResult-1.json   |  61 ++++++
 .../compositePipelineTranslatorTestResult-2.json   |  30 +++
 .../thirdeye/detection/yaml/pipeline-config-1.yaml |  34 +++
 .../thirdeye/detection/yaml/pipeline-config-2.yaml |  22 ++
 .../thirdeye/detection/yaml/pipeline-config.yaml   |  29 ---
 101 files changed, 2233 insertions(+), 1168 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/ThirdEyeAnomalyApplication.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/ThirdEyeAnomalyApplication.java
index 25ff973..888d632 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/ThirdEyeAnomalyApplication.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/ThirdEyeAnomalyApplication.java
@@ -38,6 +38,7 @@ import com.linkedin.thirdeye.datasource.ThirdEyeCacheRegistry;
 import com.linkedin.thirdeye.datasource.pinot.resources.PinotDataSourceResource;
 import com.linkedin.thirdeye.detection.DetectionPipelineScheduler;
 import com.linkedin.thirdeye.detection.alert.DetectionAlertScheduler;
+import com.linkedin.thirdeye.detection.annotation.DetectionRegistry;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
 import com.linkedin.thirdeye.tracking.RequestStatisticsLogger;
@@ -112,6 +113,9 @@ public class ThirdEyeAnomalyApplication
       LOG.error("Exception while loading caches", e);
     }
 
+    // instantiate detection registry
+    DetectionRegistry.init();
+
     environment.getObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
     environment.getObjectMapper().registerModule(makeMapperModule());
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/ThirdEyeDashboardApplication.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index 701c2c6..4349587 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -69,6 +69,7 @@ import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
 import com.linkedin.thirdeye.detection.DetectionMigrationResource;
 import com.linkedin.thirdeye.detection.DetectionResource;
 import com.linkedin.thirdeye.detection.annotation.DetectionConfigurationResource;
+import com.linkedin.thirdeye.detection.annotation.DetectionRegistry;
 import com.linkedin.thirdeye.detection.yaml.YamlResource;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
@@ -150,6 +151,9 @@ public class ThirdEyeDashboardApplication
       LOG.error("Exception while loading caches", e);
     }
 
+    // instantiate detection registry
+    DetectionRegistry.init();
+
     AnomalyFunctionFactory anomalyFunctionFactory = new AnomalyFunctionFactory(config.getFunctionConfigPath());
     AlertFilterFactory alertFilterFactory = new AlertFilterFactory(config.getAlertFilterConfigPath());
     AlertFilterAutotuneFactory alertFilterAutotuneFactory = new AlertFilterAutotuneFactory(config.getFilterAutotuneConfigPath());
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/ResourceUtils.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/ResourceUtils.java
index ef322e1..0d9fd3d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/ResourceUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/ResourceUtils.java
@@ -19,7 +19,6 @@ package com.linkedin.thirdeye.dashboard.resources.v2;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.TreeMultimap;
 import com.linkedin.thirdeye.api.TimeGranularity;
-import com.linkedin.thirdeye.constant.AnomalyFeedbackType;
 import com.linkedin.thirdeye.constant.AnomalyResultSource;
 import com.linkedin.thirdeye.dashboard.resources.v2.pojo.AnomalyClassificationType;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
index 0b7ad34..a8b2b3f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
@@ -29,9 +29,6 @@ import com.linkedin.thirdeye.datasource.loader.AggregationLoader;
 import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
-import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregate;
-import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
-import com.linkedin.thirdeye.rootcause.timeseries.BaselineNone;
 import com.wordnik.swagger.annotations.Api;
 import com.wordnik.swagger.annotations.ApiOperation;
 import com.wordnik.swagger.annotations.ApiParam;
@@ -45,8 +42,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import javax.validation.constraints.NotNull;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dataframe/util/DataFrameUtils.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dataframe/util/DataFrameUtils.java
index af854f5..2d8be7a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dataframe/util/DataFrameUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dataframe/util/DataFrameUtils.java
@@ -16,8 +16,6 @@
 
 package com.linkedin.thirdeye.dataframe.util;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
 import com.linkedin.thirdeye.api.TimeGranularity;
 import com.linkedin.thirdeye.dashboard.Utils;
 import com.linkedin.thirdeye.dataframe.DataFrame;
@@ -43,10 +41,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeFieldType;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dataframe/util/MetricSlice.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dataframe/util/MetricSlice.java
index 230a078..4bcfd60 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dataframe/util/MetricSlice.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dataframe/util/MetricSlice.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -19,7 +19,6 @@ package com.linkedin.thirdeye.dataframe.util;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import com.linkedin.thirdeye.api.TimeGranularity;
-import com.linkedin.thirdeye.constant.MetricAggFunction;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.builder.ToStringBuilder;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
index 8562868..979ca79 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
@@ -17,8 +17,19 @@
 package com.linkedin.thirdeye.datalayer.dto;
 
 import com.linkedin.thirdeye.datalayer.pojo.DetectionConfigBean;
+import com.linkedin.thirdeye.detection.spi.components.BaseComponent;
+import java.util.HashMap;
+import java.util.Map;
 
 
 public class DetectionConfigDTO extends DetectionConfigBean {
+  private Map<String, BaseComponent> components = new HashMap<>();
 
+  public Map<String, BaseComponent> getComponents() {
+    return components;
+  }
+
+  public void setComponents(Map<String, BaseComponent> components) {
+    this.components = components;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionConfigBean.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionConfigBean.java
index f36064f..a9b0392 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionConfigBean.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionConfigBean.java
@@ -36,6 +36,15 @@ public class DetectionConfigBean extends AbstractBean {
   Map<String, Object> properties;
   boolean active;
   String yaml;
+  Map<String, Object> componentSpecs;
+
+  public Map<String, Object> getComponentSpecs() {
+    return componentSpecs;
+  }
+
+  public void setComponentSpecs(Map<String, Object> componentSpecs) {
+    this.componentSpecs = componentSpecs;
+  }
 
   public String getYaml() {
     return yaml;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
index 1f5ffea..51e1a91 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
@@ -24,6 +24,8 @@ import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.EventDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -70,9 +72,10 @@ public interface DataProvider {
    * @see AnomalySlice
    *
    * @param slices anomaly slice
+   * @param configId configId
    * @return multimap of anomalies (keyed by slice)
    */
-  Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices);
+  Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices, long configId);
 
   /**
    * Returns a multimap of events (keyed by slice) for a given set of slices.
@@ -95,6 +98,7 @@ public interface DataProvider {
    */
   Map<Long, MetricConfigDTO> fetchMetrics(Collection<Long> ids);
 
+
   /**
    * Returns a map of dataset configs (keyed by id) for a given set of dataset names.
    *
@@ -106,6 +110,17 @@ public interface DataProvider {
   Map<String, DatasetConfigDTO> fetchDatasets(Collection<String> datasetNames);
 
   /**
+   * Returns a metricConfigDTO for a given metric name.
+   *
+   * @see MetricConfigDTO
+   *
+   * @param metricName metric name
+   * @param datasetName dataset name
+   * @return map of dataset configs (keyed by dataset name)
+   */
+  MetricConfigDTO fetchMetric(String metricName, String datasetName);
+
+  /**
    * Returns an initialized instance of a detection pipeline for the given config. Injects this
    * DataProvider as provider for the new pipeline.
    *
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
index 8f8ae2f..4ea926b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
@@ -32,6 +32,8 @@ import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.datalayer.util.Predicate;
 import com.linkedin.thirdeye.datasource.loader.AggregationLoader;
 import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -121,16 +123,16 @@ public class DefaultDataProvider implements DataProvider {
   }
 
   @Override
-  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices) {
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
     Multimap<AnomalySlice, MergedAnomalyResultDTO> output = ArrayListMultimap.create();
     for (AnomalySlice slice : slices) {
       List<Predicate> predicates = new ArrayList<>();
-      if (slice.end >= 0)
-        predicates.add(Predicate.LT("startTime", slice.end));
-      if (slice.start >= 0)
-        predicates.add(Predicate.GT("endTime", slice.start));
-      if (slice.configId >= 0)
-        predicates.add(Predicate.EQ("detectionConfigId", slice.configId));
+      if (slice.getEnd() >= 0)
+        predicates.add(Predicate.LT("startTime", slice.getEnd()));
+      if (slice.getStart() >= 0)
+        predicates.add(Predicate.GT("endTime", slice.getStart()));
+      if (configId >= 0)
+        predicates.add(Predicate.EQ("detectionConfigId", configId));
 
       if (predicates.isEmpty())
         throw new IllegalArgumentException("Must provide at least one of start, end, or detectionConfigId");
@@ -138,7 +140,12 @@ public class DefaultDataProvider implements DataProvider {
       List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(AND(predicates));
       Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
       while (itAnomaly.hasNext()) {
-        if (!slice.match(itAnomaly.next())) {
+        MergedAnomalyResultDTO anomaly = itAnomaly.next();
+        if (configId >= 0 && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != configId)){
+          itAnomaly.remove();
+        }
+
+        if (!slice.match(anomaly)) {
           itAnomaly.remove();
         }
       }
@@ -153,10 +160,10 @@ public class DefaultDataProvider implements DataProvider {
     Multimap<EventSlice, EventDTO> output = ArrayListMultimap.create();
     for (EventSlice slice : slices) {
       List<Predicate> predicates = new ArrayList<>();
-      if (slice.end >= 0)
-        predicates.add(Predicate.LT("startTime", slice.end));
-      if (slice.start >= 0)
-        predicates.add(Predicate.GT("endTime", slice.start));
+      if (slice.getEnd() >= 0)
+        predicates.add(Predicate.LT("startTime", slice.getEnd()));
+      if (slice.getStart() >= 0)
+        predicates.add(Predicate.GT("endTime", slice.getStart()));
 
       if (predicates.isEmpty())
         throw new IllegalArgumentException("Must provide at least one of start, or end");
@@ -204,6 +211,11 @@ public class DefaultDataProvider implements DataProvider {
     return this.loader.from(this, config, start, end);
   }
 
+  @Override
+  public MetricConfigDTO fetchMetric(String metricName, String datasetName) {
+    return this.metricDAO.findByMetricAndDataset(metricName, datasetName);
+  }
+
   private static Predicate AND(Collection<Predicate> predicates) {
     return Predicate.AND(predicates.toArray(new Predicate[predicates.size()]));
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipeline.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipeline.java
index 4c07352..36c4060 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipeline.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipeline.java
@@ -26,17 +26,23 @@ import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import com.linkedin.thirdeye.detection.spec.AbstractSpec;
+import com.linkedin.thirdeye.detection.spi.components.BaseComponent;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.collections.MapUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
+import static com.linkedin.thirdeye.detection.DetectionUtils.*;
 
 
 /**
@@ -44,16 +50,25 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
  * for implementing (intermittently stateful) executable pipelines on top of it.
  */
 public abstract class DetectionPipeline {
+  private static String PROP_CLASS_NAME = "className";
+  private static final Logger LOG = LoggerFactory.getLogger(DetectionPipeline.class);
+
   protected final DataProvider provider;
   protected final DetectionConfigDTO config;
   protected final long startTime;
   protected final long endTime;
 
-  protected DetectionPipeline(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
+  protected DetectionPipeline(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
+  {
     this.provider = provider;
     this.config = config;
     this.startTime = startTime;
     this.endTime = endTime;
+    try {
+      this.initComponents();
+    } catch (Exception e) {
+      LOG.error("initialize components failed", e);
+    }
   }
 
   /**
@@ -65,6 +80,48 @@ public abstract class DetectionPipeline {
   public abstract DetectionPipelineResult run() throws Exception;
 
   /**
+   * Initialize all components in the pipeline
+   * @throws Exception
+   */
+  private void initComponents() throws Exception {
+    InputDataFetcher dataFetcher = new InputDataFetcher(this.provider, this.config.getId());
+    Map<String, BaseComponent> instancesMap = config.getComponents();
+    Map<String, Object> componentSpecs = config.getComponentSpecs();
+    if (componentSpecs != null) {
+      for (String componentName : componentSpecs.keySet()) {
+        Map<String, Object> componentSpec = MapUtils.getMap(componentSpecs, componentName);
+        if (!instancesMap.containsKey(componentName)){
+          instancesMap.put(componentName, createComponent(componentSpec));
+        }
+      }
+
+      for (String componentName : componentSpecs.keySet()) {
+        Map<String, Object> componentSpec = MapUtils.getMap(componentSpecs, componentName);
+        for (Map.Entry<String, Object> entry : componentSpec.entrySet()){
+          if (DetectionUtils.isReferenceName(entry.getValue().toString())) {
+            String refComponentName = DetectionUtils.getComponentName(entry.getValue().toString());
+            componentSpec.put(entry.getKey(), instancesMap.get(refComponentName));
+          }
+        }
+        instancesMap.get(componentName).init(getComponentSpec(componentSpec), dataFetcher);
+      }
+    }
+    config.setComponents(instancesMap);
+  }
+
+  private BaseComponent createComponent(Map<String, Object> componentSpec)
+      throws Exception {
+    Class<BaseComponent> clazz = (Class<BaseComponent>) Class.forName(MapUtils.getString(componentSpec, PROP_CLASS_NAME));
+    return clazz.newInstance();
+  }
+
+  private AbstractSpec getComponentSpec(Map<String, Object> componentSpec) throws Exception {
+    Class clazz = Class.forName(MapUtils.getString(componentSpec, PROP_CLASS_NAME));
+    Class<AbstractSpec> specClazz = (Class<AbstractSpec>) Class.forName(getSpecClassName(clazz));
+    return AbstractSpec.fromProperties(componentSpec, specClazz);
+  }
+
+  /**
    * Helper for creating an anomaly for a given metric slice. Injects properties such as
    * metric name, filter dimensions, etc.
    *
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index 8c9518d..e6dcad4 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -33,6 +33,7 @@ import com.linkedin.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
 import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
 import com.linkedin.thirdeye.detection.finetune.GridSearchTuningAlgorithm;
 import com.linkedin.thirdeye.detection.finetune.TuningAlgorithm;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.wordnik.swagger.annotations.ApiParam;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -47,6 +48,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,7 +102,8 @@ public class DetectionResource {
     DetectionConfigDTO config = new DetectionConfigDTO();
     config.setId(Long.MAX_VALUE);
     config.setName("preview");
-    config.setProperties(properties);
+    config.setProperties(MapUtils.getMap(properties, "properties"));
+    config.setComponentSpecs(MapUtils.getMap(properties, "componentSpecs"));
 
     DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
     DetectionPipelineResult result = pipeline.run();
@@ -127,10 +130,10 @@ public class DetectionResource {
 
     LinkedHashMap<String, List<Number>> parameters = (LinkedHashMap<String, List<Number>>) json.get("parameters");
 
-    AnomalySlice slice = new AnomalySlice().withConfigId(configId).withStart(start).withEnd(end);
+    AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
 
     TuningAlgorithm gridSearch = new GridSearchTuningAlgorithm(OBJECT_MAPPER.writeValueAsString(json.get("properties")), parameters);
-    gridSearch.fit(slice);
+    gridSearch.fit(slice, configId);
 
     return Response.ok(gridSearch.bestDetectionConfig().getProperties()).build();
   }
@@ -171,8 +174,8 @@ public class DetectionResource {
     }
 
     // clear existing anomalies
-    AnomalySlice slice = new AnomalySlice().withConfigId(configId).withStart(start).withEnd(end);
-    Collection<MergedAnomalyResultDTO> existing = this.provider.fetchAnomalies(Collections.singleton(slice)).get(slice);
+    AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+    Collection<MergedAnomalyResultDTO> existing = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
 
     List<Long> existingIds = new ArrayList<>();
     for (MergedAnomalyResultDTO anomaly : existing) {
@@ -199,4 +202,4 @@ public class DetectionResource {
 
     return Response.ok(result).build();
   }
-}
+  }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyDetectionStage.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
similarity index 64%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyDetectionStage.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
index db96e94..f1bf20d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyDetectionStage.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
@@ -14,83 +14,79 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.algorithm.stage;
+package com.linkedin.thirdeye.detection;
 
+import com.google.common.collect.Multimap;
+import com.linkedin.thirdeye.api.DimensionMap;
 import com.linkedin.thirdeye.dataframe.BooleanSeries;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.LongSeries;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.EventDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import com.linkedin.thirdeye.datalayer.pojo.MetricConfigBean;
 import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
+import com.linkedin.thirdeye.detection.spi.components.BaseComponent;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.EventSlice;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import java.lang.reflect.ParameterizedType;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
 
 import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
-import static com.linkedin.thirdeye.detection.algorithm.stage.StageUtils.*;
 
 
-/**
- * Static Anomaly detection stage. High level interface for anomaly detection stage.
- */
-public abstract class StaticAnomalyDetectionStage implements AnomalyDetectionStage {
-  private DataProvider provider;
-
-  /**
-   * Returns a data spec describing all required data(time series, aggregates, existing anomalies) to perform a stage.
-   * Data is retrieved in one pass and cached between executions if possible.
-   * @return input data spec
-   */
-  abstract InputDataSpec getInputDataSpec();
-
-  /**
-   * Run detection in the specified time range and return a list of anomalies
-   * @param data data(time series, anomalies, etc.) as described by data spec
-   * @return list of anomalies
-   */
-  abstract List<MergedAnomalyResultDTO> runDetection(InputData data);
+public class DetectionUtils {
+  // TODO anomaly should support multimap
+  public static DimensionMap toFilterMap(Multimap<String, String> filters) {
+    DimensionMap map = new DimensionMap();
+    for (Map.Entry<String, String> entry : filters.entries()) {
+      map.put(entry.getKey(), entry.getValue());
+    }
+    return map;
+  }
 
-  @Override
-  public final List<MergedAnomalyResultDTO> runDetection(DataProvider provider) {
-    this.provider = provider;
-    return this.runDetection(getDataForSpec(provider, this.getInputDataSpec()));
+  // Check if a string is a component reference
+  public static boolean isReferenceName(String key) {
+    return key.startsWith("$");
   }
 
-  /**
-   * Helper for creating an anomaly for a given metric slice. Injects properties such as
-   * metric name, filter dimensions, etc.
-   *
-   * @param slice metric slice
-   * @return anomaly template
-   */
-  protected final MergedAnomalyResultDTO makeAnomaly(MetricSlice slice) {
-    MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
-    anomaly.setStartTime(slice.getStart());
-    anomaly.setEndTime(slice.getEnd());
+  // get the component name from the reference key
+  public static String getComponentName(String key) {
+    return key.substring(1);
+  }
 
-    return anomaly;
+  // get the spec class name for a component class
+  public static String getSpecClassName(Class<BaseComponent> componentClass) {
+    ParameterizedType genericSuperclass = (ParameterizedType) componentClass.getGenericInterfaces()[0];
+    return (genericSuperclass.getActualTypeArguments()[0].getTypeName());
   }
 
+
   /**
-   * Helper for creating a list of anomalies from a boolean series. Injects properties via
-   * {@code makeAnomaly(MetricSlice, MetricConfigDTO, Long)}.
+   * Helper for creating a list of anomalies from a boolean series.
    *
    * @param slice metric slice
    * @param df time series with COL_TIME and at least one boolean value series
    * @param seriesName name of the value series
-   * @param configId configuration id of this pipeline
    * @param endTime end time of this detection window
+   * @param dataset dataset config for the metric
    * @return list of anomalies
    */
-  protected final List<MergedAnomalyResultDTO> makeAnomalies(MetricSlice slice, DataFrame df, String seriesName, Long configId, long endTime) {
+  public static List<MergedAnomalyResultDTO> makeAnomalies(MetricSlice slice, DataFrame df, String seriesName, long endTime, DatasetConfigDTO dataset) {
     if (df.isEmpty()) {
       return Collections.emptyList();
     }
@@ -101,13 +97,6 @@ public abstract class StaticAnomalyDetectionStage implements AnomalyDetectionSta
       return Collections.emptyList();
     }
 
-    Map<Long, MetricConfigDTO> metrics = this.provider.fetchMetrics(Collections.singleton(slice.getMetricId()));
-    if (!metrics.containsKey(slice.getMetricId())) {
-      throw new IllegalArgumentException(String.format("Could not resolve metric id %d", slice.getMetricId()));
-    }
-
-    MetricConfigDTO metric = metrics.get(slice.getMetricId());
-
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
     LongSeries sTime = df.getLongs(COL_TIME);
     BooleanSeries sVal = df.getBooleans(seriesName);
@@ -137,7 +126,6 @@ public abstract class StaticAnomalyDetectionStage implements AnomalyDetectionSta
       long end = start + 1;
 
       // guess-timate of next time series timestamp
-      DatasetConfigDTO dataset = this.provider.fetchDatasets(Collections.singleton(metric.getDataset())).get(metric.getDataset());
       if (dataset != null) {
         Period period = dataset.bucketTimeGranularity().toPeriod();
         DateTimeZone timezone = DateTimeZone.forID(dataset.getTimezone());
@@ -155,4 +143,19 @@ public abstract class StaticAnomalyDetectionStage implements AnomalyDetectionSta
 
     return anomalies;
   }
+
+  /**
+   * Helper for creating an anomaly for a given metric slice. Injects properties such as
+   * metric name, filter dimensions, etc.
+   *
+   * @param slice metric slice
+   * @return anomaly template
+   */
+  public static MergedAnomalyResultDTO makeAnomaly(MetricSlice slice) {
+    MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+    anomaly.setStartTime(slice.getStart());
+    anomaly.setEndTime(slice.getEnd());
+
+    return anomaly;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputDataFetcher.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputDataFetcher.java
new file mode 100644
index 0000000..10c2e14
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputDataFetcher.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection;
+
+import com.google.common.collect.Multimap;
+import com.linkedin.thirdeye.dataframe.DataFrame;
+import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.EventDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.EventSlice;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Input data fetcher.
+ * For components to fetch the input data it need.
+ */
+public class InputDataFetcher {
+  private DataProvider provider;
+  private long configId;
+
+  public InputDataFetcher(DataProvider provider, long configId) {
+    this.provider = provider;
+    this.configId = configId;
+  }
+
+  /**
+   * Fetch data for input data spec
+   */
+  public InputData fetchData(InputDataSpec inputDataSpec) {
+    Map<MetricSlice, DataFrame> timeseries = provider.fetchTimeseries(inputDataSpec.getTimeseriesSlices());
+    Map<MetricSlice, DataFrame> aggregates =
+        provider.fetchAggregates(inputDataSpec.getAggregateSlices(), Collections.<String>emptyList());
+    Multimap<AnomalySlice, MergedAnomalyResultDTO> existingAnomalies =
+        provider.fetchAnomalies(inputDataSpec.getAnomalySlices(), configId);
+    Multimap<EventSlice, EventDTO> events = provider.fetchEvents(inputDataSpec.getEventSlices());
+    Map<Long, MetricConfigDTO> metrics = provider.fetchMetrics(inputDataSpec.getMetricIds());
+    Map<String, DatasetConfigDTO> datasets = provider.fetchDatasets(inputDataSpec.getDatasetNames());
+
+    Map<Long, DatasetConfigDTO> datasetForMetricId = fetchDatasetForMetricId(provider, inputDataSpec);
+    return new InputData(inputDataSpec, timeseries, aggregates, existingAnomalies, events, metrics, datasets, datasetForMetricId);
+  }
+
+  private Map<Long, DatasetConfigDTO> fetchDatasetForMetricId(DataProvider provider, InputDataSpec inputDataSpec) {
+    Map<Long, MetricConfigDTO> metrics = provider.fetchMetrics(inputDataSpec.getMetricIdsForDatasets());
+    Map<Long, String> metricIdToDataSet = new HashMap<>();
+    for (Map.Entry<Long, MetricConfigDTO> entry : metrics.entrySet()){
+      metricIdToDataSet.put(entry.getKey(), entry.getValue().getDataset());
+    }
+    Map<String, DatasetConfigDTO> datasets = provider.fetchDatasets(metricIdToDataSet.values());
+    Map<Long, DatasetConfigDTO> result = new HashMap<>();
+    for (Map.Entry<Long, MetricConfigDTO> entry : metrics.entrySet()){
+      result.put(entry.getKey(), datasets.get(entry.getValue().getDataset()));
+    }
+    return result;
+  }
+
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/StaticDetectionPipeline.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/StaticDetectionPipeline.java
index 4c298eb..6b52761 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/StaticDetectionPipeline.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/StaticDetectionPipeline.java
@@ -18,10 +18,14 @@ package com.linkedin.thirdeye.detection;
 
 import com.google.common.collect.Multimap;
 import com.linkedin.thirdeye.dataframe.DataFrame;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.EventDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import java.util.Collections;
 import java.util.Map;
 
@@ -71,10 +75,10 @@ public abstract class StaticDetectionPipeline extends DetectionPipeline {
   @Override
   public final DetectionPipelineResult run() throws Exception {
     InputDataSpec dataSpec = this.getInputDataSpec();
-    Map<MetricSlice, DataFrame> timeseries = this.provider.fetchTimeseries(dataSpec.timeseriesSlices);
-    Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(dataSpec.aggregateSlices, Collections.<String>emptyList());
-    Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies = this.provider.fetchAnomalies(dataSpec.anomalySlices);
-    Multimap<EventSlice, EventDTO> events = this.provider.fetchEvents(dataSpec.eventSlices);
+    Map<MetricSlice, DataFrame> timeseries = this.provider.fetchTimeseries(dataSpec.getTimeseriesSlices());
+    Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(dataSpec.getAggregateSlices(), Collections.<String>emptyList());
+    Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies = this.provider.fetchAnomalies(dataSpec.getAnomalySlices(), this.config.getId());
+    Multimap<EventSlice, EventDTO> events = this.provider.fetchEvents(dataSpec.getEventSlices());
 
     InputData data = new InputData(
         dataSpec, timeseries, aggregates, anomalies, events);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
index 9f721f4..89db673 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
@@ -20,7 +20,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.DataProvider;
 import java.util.Collection;
 import java.util.Collections;
@@ -50,8 +50,8 @@ public abstract class StatefulDetectionAlertFilter extends DetectionAlertFilter
     for (Long detectionConfigId : vectorClocks.keySet()) {
       long startTime = vectorClocks.get(detectionConfigId);
 
-      AnomalySlice slice = new AnomalySlice().withConfigId(detectionConfigId).withStart(startTime).withEnd(this.endTime);
-      Collection<MergedAnomalyResultDTO> candidates = this.provider.fetchAnomalies(Collections.singletonList(slice)).get(slice);
+      AnomalySlice slice = new AnomalySlice().withStart(startTime).withEnd(this.endTime);
+      Collection<MergedAnomalyResultDTO> candidates = this.provider.fetchAnomalies(Collections.singletonList(slice), detectionConfigId).get(slice);
 
       Collection<MergedAnomalyResultDTO> anomalies =
           Collections2.filter(candidates, new Predicate<MergedAnomalyResultDTO>() {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
index bf92508..05b0454 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
@@ -22,15 +22,13 @@ import com.google.common.collect.Collections2;
 import com.linkedin.thirdeye.datalayer.dto.AlertConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.ConfigUtils;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.alert.DetectionAlertFilter;
-import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterRecipients;
 import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
 import com.linkedin.thirdeye.detector.email.filter.BaseAlertFilter;
 import com.linkedin.thirdeye.detector.email.filter.DummyAlertFilter;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -76,9 +74,9 @@ public class LegacyAlertFilter extends DetectionAlertFilter {
       long startTime = MapUtils.getLong(this.vectorClocks, detectionConfigId, 0L);
 
       AnomalySlice slice =
-          new AnomalySlice().withConfigId(detectionConfigId).withStart(startTime).withEnd(this.endTime);
+          new AnomalySlice().withStart(startTime).withEnd(this.endTime);
       Collection<MergedAnomalyResultDTO> candidates =
-          this.provider.fetchAnomalies(Collections.singletonList(slice)).get(slice);
+          this.provider.fetchAnomalies(Collections.singletonList(slice), detectionConfigId).get(slice);
 
       Collection<MergedAnomalyResultDTO> anomalies =
           Collections2.filter(candidates, new Predicate<MergedAnomalyResultDTO>() {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/BaselineAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/BaselineAlgorithm.java
index 49fc847..5776c1b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/BaselineAlgorithm.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/BaselineAlgorithm.java
@@ -25,8 +25,8 @@ import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
 import com.linkedin.thirdeye.detection.StaticDetectionPipeline;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
 import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregate;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/DimensionWrapper.java
index 5c1c759..28ed691 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/DimensionWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -195,6 +195,7 @@ public class DimensionWrapper extends DetectionPipeline {
     nestedConfig.setId(this.config.getId());
     nestedConfig.setName(this.config.getName());
     nestedConfig.setProperties(properties);
+    nestedConfig.setComponents(this.config.getComponents());
 
     DetectionPipeline pipeline = this.provider.loadPipeline(nestedConfig, this.startTime, this.endTime);
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithm.java
index 65abea3..42d2116 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithm.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithm.java
@@ -36,7 +36,7 @@ import com.linkedin.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
@@ -110,8 +110,8 @@ public class LegacyAnomalyFunctionAlgorithm extends DetectionPipeline {
     try {
       Collection<MergedAnomalyResultDTO> historyMergedAnomalies;
       if (this.anomalyFunction.useHistoryAnomaly() && config.getId() != null) {
-        AnomalySlice slice = new AnomalySlice().withConfigId(config.getId()).withStart(this.startTime).withEnd(this.endTime);
-        historyMergedAnomalies = this.provider.fetchAnomalies(Collections.singletonList(slice)).get(slice);
+        AnomalySlice slice = new AnomalySlice().withStart(this.startTime).withEnd(this.endTime);
+        historyMergedAnomalies = this.provider.fetchAnomalies(Collections.singletonList(slice), config.getId()).get(slice);
       } else {
         historyMergedAnomalies = Collections.emptyList();
       }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/LegacyMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/LegacyMergeWrapper.java
index b6015e0..57f32f2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/LegacyMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/LegacyMergeWrapper.java
@@ -38,7 +38,7 @@ import com.linkedin.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.ConfigUtils;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
@@ -125,7 +125,6 @@ public class LegacyMergeWrapper extends DetectionPipeline {
     this.mergeConfig = mergeConfig;
     this.maxGap = mergeConfig.getSequentialAllowedGap();
     this.slice = new AnomalySlice()
-        .withConfigId(config.getId())
         .withStart(startTime)
         .withEnd(endTime);
 
@@ -170,7 +169,7 @@ public class LegacyMergeWrapper extends DetectionPipeline {
         .withEnd(this.getEndTime(generated) + this.maxGap);
 
     List<MergedAnomalyResultDTO> retrieved = new ArrayList<>();
-    retrieved.addAll(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice)).get(effectiveSlice));
+    retrieved.addAll(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), config.getId()).get(effectiveSlice));
 
     return new DetectionPipelineResult(this.merge(generated, retrieved));
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index cff2cdb..3f8b0de 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -20,7 +20,7 @@ import com.google.common.base.Preconditions;
 import com.linkedin.thirdeye.api.DimensionMap;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.ConfigUtils;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
@@ -79,7 +79,7 @@ public class MergeWrapper extends DetectionPipeline {
 
     this.maxGap = MapUtils.getLongValue(config.getProperties(), "maxGap", 0);
     this.maxDuration = MapUtils.getLongValue(config.getProperties(), "maxDuration", Long.MAX_VALUE);
-    this.slice = new AnomalySlice().withStart(startTime).withEnd(endTime).withConfigId(config.getId());
+    this.slice = new AnomalySlice().withStart(startTime).withEnd(endTime);
     this.nestedProperties = ConfigUtils.getList(config.getProperties().get(PROP_NESTED));
   }
 
@@ -99,6 +99,7 @@ public class MergeWrapper extends DetectionPipeline {
       nestedConfig.setId(this.config.getId());
       nestedConfig.setName(this.config.getName());
       nestedConfig.setProperties(properties);
+      nestedConfig.setComponents(this.config.getComponents());
 
       DetectionPipeline pipeline = this.provider.loadPipeline(nestedConfig, this.startTime, this.endTime);
 
@@ -116,7 +117,7 @@ public class MergeWrapper extends DetectionPipeline {
         .withEnd(this.getEndTime(generated) + this.maxGap + 1);
 
     List<MergedAnomalyResultDTO> retrieved = new ArrayList<>();
-    retrieved.addAll(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice)).get(effectiveSlice));
+    retrieved.addAll(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice));
 
     // merge
     List<MergedAnomalyResultDTO> all = new ArrayList<>();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MovingWindowAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MovingWindowAlgorithm.java
index 7f4fd01..3981938 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MovingWindowAlgorithm.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MovingWindowAlgorithm.java
@@ -28,13 +28,13 @@ import com.linkedin.thirdeye.dataframe.util.DataFrameUtils;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.ConfigUtils;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
 import com.linkedin.thirdeye.detection.StaticDetectionPipeline;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
 import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregate;
@@ -141,7 +141,6 @@ public class MovingWindowAlgorithm extends StaticDetectionPipeline {
     this.sliceDetection = MetricSlice.from(me.getId(), detectionStart.getMillis(), endTime, me.getFilters());
 
     this.anomalySlice = new AnomalySlice()
-        .withConfigId(this.config.getId())
         .withStart(this.sliceData.getStart())
         .withEnd(this.sliceData.getEnd());
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/ThresholdAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/ThresholdAlgorithm.java
index 4b01dbe..09886d7 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/ThresholdAlgorithm.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/ThresholdAlgorithm.java
@@ -24,8 +24,8 @@ import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
 import com.linkedin.thirdeye.detection.StaticDetectionPipeline;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import java.util.Collections;
 import java.util.List;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyDetectionStageWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyDetectionStageWrapper.java
index 9646de4..a7793ed 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyDetectionStageWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyDetectionStageWrapper.java
@@ -27,6 +27,7 @@ import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
+import com.linkedin.thirdeye.detection.DetectionUtils;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.util.ThirdEyeUtils;
 import java.util.ArrayList;
@@ -119,7 +120,7 @@ public class AnomalyDetectionStageWrapper extends DetectionPipeline {
       anomaly.setMetricUrn(this.metricUrn);
       anomaly.setMetric(metric.getName());
       anomaly.setCollection(metric.getDataset());
-      anomaly.setDimensions(StageUtils.toFilterMap(me.getFilters()));
+      anomaly.setDimensions(DetectionUtils.toFilterMap(me.getFilters()));
     }
     return new DetectionPipelineResult(anomalies);
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyFilterStageWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyFilterStageWrapper.java
index 2eb8a8d..493d75f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyFilterStageWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyFilterStageWrapper.java
@@ -21,7 +21,6 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.datalayer.util.StringUtils;
 import com.linkedin.thirdeye.detection.ConfigUtils;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleDetectionStage.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleDetectionStage.java
index 4ceafad..627dfe1 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleDetectionStage.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleDetectionStage.java
@@ -22,12 +22,12 @@ import com.linkedin.thirdeye.dataframe.BooleanSeries;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
-import com.linkedin.thirdeye.detection.annotation.Detection;
-import com.linkedin.thirdeye.detection.annotation.DetectionParam;
+import com.linkedin.thirdeye.detection.annotation.Components;
+import com.linkedin.thirdeye.detection.annotation.Param;
 import com.linkedin.thirdeye.detection.annotation.DetectionTag;
 import com.linkedin.thirdeye.detection.annotation.PresentationOption;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
 import java.util.ArrayList;
@@ -42,7 +42,7 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
  * Simple baseline algorithm. Computes a multi-week aggregate baseline and compares
  * the current value based on relative change or absolute difference.
  */
-@Detection(name = "Baseline rule detection",
+@Components(title = "Baseline rule detection",
     type = "BASELINE",
     tags = {DetectionTag.RULE_FILTER},
     description = "Simple baseline algorithm. Computes a multi-week aggregate baseline and compares the current value "
@@ -52,9 +52,9 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
       @PresentationOption(name = "percentage change", template = "comparing ${offset} is more than ${change}")
     },
     params = {
-        @DetectionParam(name = "offset", defaultValue = "wo1w"),
-        @DetectionParam(name = "change", placeholder = "value"),
-        @DetectionParam(name = "difference", placeholder = "value")
+        @Param(name = "offset", defaultValue = "wo1w"),
+        @Param(name = "change", placeholder = "value"),
+        @Param(name = "difference", placeholder = "value")
     })
 public class BaselineRuleDetectionStage extends StaticAnomalyDetectionStage {
   private static final String COL_CURR = "current";
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleFilterStage.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleFilterStage.java
index 850abae..40459bd 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleFilterStage.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleFilterStage.java
@@ -23,7 +23,7 @@ import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.annotation.Detection;
+import com.linkedin.thirdeye.detection.annotation.Components;
 import com.linkedin.thirdeye.detection.annotation.DetectionTag;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
@@ -38,7 +38,7 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
 /**
  * This filter stage filters the anomalies if either the absolute changeThreshold, percentage changeThreshold or site wide impact does not pass the threshold.
  */
-@Detection(name = "Baseline Filter",
+@Components(title = "Baseline Filter",
     type = "BUSINESS_RULE_FILTER",
     tags = {DetectionTag.RULE_FILTER},
     description = "Baseline rule filter. Filters the anomalies if percentage change, absolute difference or site wide impact is below certain threshold.")
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StageUtils.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StageUtils.java
deleted file mode 100644
index 10c7165..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StageUtils.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.linkedin.thirdeye.detection.algorithm.stage;
-
-import com.google.common.collect.Multimap;
-import com.linkedin.thirdeye.api.DimensionMap;
-import com.linkedin.thirdeye.dataframe.DataFrame;
-import com.linkedin.thirdeye.dataframe.util.MetricSlice;
-import com.linkedin.thirdeye.datalayer.dto.EventDTO;
-import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
-import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.EventSlice;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
-import java.util.Collections;
-import java.util.Map;
-
-
-public class StageUtils {
-  /**
-   * Get the data for a data spec from provider
-   * @param provider the data provider
-   * @param inputDataSpec the spec of input data for the detection stage
-   * @return data
-   */
-  public static InputData getDataForSpec(DataProvider provider, InputDataSpec inputDataSpec) {
-    Map<MetricSlice, DataFrame> timeseries = provider.fetchTimeseries(inputDataSpec.getTimeseriesSlices());
-    Map<MetricSlice, DataFrame> aggregates =
-        provider.fetchAggregates(inputDataSpec.getAggregateSlices(), Collections.<String>emptyList());
-    Multimap<AnomalySlice, MergedAnomalyResultDTO> existingAnomalies =
-        provider.fetchAnomalies(inputDataSpec.getAnomalySlices());
-    Multimap<EventSlice, EventDTO> events = provider.fetchEvents(inputDataSpec.getEventSlices());
-
-    return new InputData(inputDataSpec, timeseries, aggregates, existingAnomalies, events);
-  }
-
-  // TODO anomaly should support multimap
-  public static DimensionMap toFilterMap(Multimap<String, String> filters) {
-    DimensionMap map = new DimensionMap();
-    for (Map.Entry<String, String> entry : filters.entries()) {
-      map.put(entry.getKey(), entry.getValue());
-    }
-    return map;
-  }
-
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyDetectionStage.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyDetectionStage.java
index db96e94..952115c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyDetectionStage.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyDetectionStage.java
@@ -24,8 +24,9 @@ import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -35,7 +36,6 @@ import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
 
 import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
-import static com.linkedin.thirdeye.detection.algorithm.stage.StageUtils.*;
 
 
 /**
@@ -43,6 +43,12 @@ import static com.linkedin.thirdeye.detection.algorithm.stage.StageUtils.*;
  */
 public abstract class StaticAnomalyDetectionStage implements AnomalyDetectionStage {
   private DataProvider provider;
+  private long configId;
+
+  @Override
+  public void init(Map<String, Object> specs, Long configId, long startTime, long endTime) {
+    this.configId = configId;
+  }
 
   /**
    * Returns a data spec describing all required data(time series, aggregates, existing anomalies) to perform a stage.
@@ -61,7 +67,8 @@ public abstract class StaticAnomalyDetectionStage implements AnomalyDetectionSta
   @Override
   public final List<MergedAnomalyResultDTO> runDetection(DataProvider provider) {
     this.provider = provider;
-    return this.runDetection(getDataForSpec(provider, this.getInputDataSpec()));
+    InputDataFetcher dataFetcher = new InputDataFetcher(this.provider, this.configId);
+    return this.runDetection(dataFetcher.fetchData(this.getInputDataSpec()));
   }
 
   /**
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyFilterStage.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyFilterStage.java
index 1e70691..d350246 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyFilterStage.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticAnomalyFilterStage.java
@@ -18,16 +18,23 @@ package com.linkedin.thirdeye.detection.algorithm.stage;
 
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
-
-import static com.linkedin.thirdeye.detection.algorithm.stage.StageUtils.*;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import java.util.Map;
 
 
 /**
  * Static anomaly filter stage. High level interface for anomaly filter.
  */
 public abstract class StaticAnomalyFilterStage implements AnomalyFilterStage {
+  private long configId;
+
+  @Override
+  public void init(Map<String, Object> specs, Long configId, long startTime, long endTime) {
+    this.configId = configId;
+  }
+
   /**
    * Returns a data spec describing all required data(time series, aggregates, existing anomalies) to perform a stage.
    * Data is retrieved in one pass and cached between executions if possible.
@@ -45,6 +52,7 @@ public abstract class StaticAnomalyFilterStage implements AnomalyFilterStage {
 
   @Override
   public final boolean isQualified(MergedAnomalyResultDTO anomaly, DataProvider provider) {
-    return isQualified(anomaly, getDataForSpec(provider, this.getInputDataSpec()));
+    InputDataFetcher dataFetcher = new InputDataFetcher(provider, -1);
+    return isQualified(anomaly, dataFetcher.fetchData(this.getInputDataSpec()));
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticGrouperStage.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticGrouperStage.java
deleted file mode 100644
index 60efabb..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/StaticGrouperStage.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.linkedin.thirdeye.detection.algorithm.stage;
-
-import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
-import java.util.List;
-
-import static com.linkedin.thirdeye.detection.algorithm.stage.StageUtils.*;
-
-/**
- * Static Grouper stage. High level interface for grouper stage.
- */
-public abstract class StaticGrouperStage implements GrouperStage {
-  /**
-   * Returns a data spec describing all required data(time series, aggregates, existing anomalies) to perform a stage.
-   * Data is retrieved in one pass and cached between executions if possible.
-   * @return input data spec
-   */
-  abstract InputDataSpec getInputDataSpec();
-
-  /**
-   * group anomalies.
-   *
-   * @param anomalies list of anomalies
-   * @return list of anomalies, with grouped dimensions
-   */
-  abstract List<MergedAnomalyResultDTO> group(List<MergedAnomalyResultDTO> anomalies, InputData data);
-
-  @Override
-  public final List<MergedAnomalyResultDTO> group(List<MergedAnomalyResultDTO> anomalies, DataProvider provider) {
-    return this.group(anomalies, getDataForSpec(provider, this.getInputDataSpec()));
-  }
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/ThresholdRuleDetectionStage.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/ThresholdRuleDetectionStage.java
index a9167c3..a5165dc 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/ThresholdRuleDetectionStage.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/ThresholdRuleDetectionStage.java
@@ -20,10 +20,10 @@ import com.linkedin.thirdeye.dataframe.BooleanSeries;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
-import com.linkedin.thirdeye.detection.annotation.Detection;
-import com.linkedin.thirdeye.detection.annotation.DetectionParam;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import com.linkedin.thirdeye.detection.annotation.Components;
+import com.linkedin.thirdeye.detection.annotation.Param;
 import com.linkedin.thirdeye.detection.annotation.DetectionTag;
 import com.linkedin.thirdeye.detection.annotation.PresentationOption;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
@@ -38,7 +38,7 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
 /**
  * Simple threshold rule algorithm with (optional) upper and lower bounds on a metric value.
  */
-@Detection(name = "Threshold",
+@Components(title = "Threshold",
     type = "THRESHOLD",
     tags = { DetectionTag.RULE_DETECTION },
     description = "Simple threshold rule algorithm with (optional) upper and lower bounds on a metric value.",
@@ -47,7 +47,7 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
         description = "aggregated absolute value within a time period",
         template = "is lower than ${min} or higher than ${max}"
     )},
-    params = {@DetectionParam(name = "min", placeholder = "value"), @DetectionParam(name = "max", placeholder = "value")}
+    params = {@Param(name = "min", placeholder = "value"), @Param(name = "max", placeholder = "value")}
 )
 public class ThresholdRuleDetectionStage extends StaticAnomalyDetectionStage {
   private final String COL_TOO_HIGH = "tooHigh";
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/ThresholdRuleFilterStage.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/ThresholdRuleFilterStage.java
index 6a0bbbb..0888a1c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/ThresholdRuleFilterStage.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/ThresholdRuleFilterStage.java
@@ -20,8 +20,8 @@ import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.annotation.Detection;
-import com.linkedin.thirdeye.detection.annotation.DetectionParam;
+import com.linkedin.thirdeye.detection.annotation.Components;
+import com.linkedin.thirdeye.detection.annotation.Param;
 import com.linkedin.thirdeye.detection.annotation.DetectionTag;
 import com.linkedin.thirdeye.detection.annotation.PresentationOption;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
@@ -35,7 +35,7 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
 /**
  * This threshold rule filter stage filters the anomalies if either the min or max thresholds do not pass.
  */
-@Detection(name = "Aggregate Threshold Filter",
+@Components(title = "Aggregate Threshold Filter",
     type = "THRESHOLD_RULE_FILTER",
     tags = {DetectionTag.RULE_FILTER},
     description = "Threshold rule filter. filters the anomalies if either the min or max thresholds do not satisfied.",
@@ -45,7 +45,7 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
             description = "aggregated absolute value within a time period",
             template = "is between ${min} and ${max}"
         )},
-    params = {@DetectionParam(name = "min", placeholder = "value"), @DetectionParam(name = "max", placeholder = "value")}
+    params = {@Param(name = "min", placeholder = "value"), @Param(name = "max", placeholder = "value")}
 )
 public class ThresholdRuleFilterStage implements AnomalyFilterStage {
     private static final String PROP_MIN = "min";
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Components.java
similarity index 83%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Components.java
index 0fd4537..905c8e5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Components.java
@@ -24,11 +24,15 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 
+/**
+ * Components annotation
+ * Components with this annotation will be registered and therefore can be configured from YAML file.
+ */
 @Target({ElementType.TYPE})
 @Retention(RetentionPolicy.RUNTIME)
 @Inherited
-public @interface Detection {
-  @JsonProperty String name() default "";
+public @interface Components {
+  @JsonProperty String title() default "";
 
   @JsonProperty DetectionTag[] tags() default {};
 
@@ -40,6 +44,6 @@ public @interface Detection {
 
   @JsonProperty PresentationOption[] presentation() default {};
 
-  @JsonProperty DetectionParam[] params() default {};
+  @JsonProperty Param[] params() default {};
 }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionRegistry.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionRegistry.java
index 6405fc6..f9ccf16 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionRegistry.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionRegistry.java
@@ -16,9 +16,10 @@
 
 package com.linkedin.thirdeye.detection.annotation;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.reflect.ClassPath;
-import com.linkedin.thirdeye.detection.algorithm.stage.AnomalyDetectionStage;
+import com.linkedin.thirdeye.detection.spi.components.BaseComponent;
+import com.linkedin.thirdeye.detection.yaml.YamlDetectionConfigTranslator;
 import java.lang.annotation.Annotation;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.collections.MapUtils;
+import org.reflections.Reflections;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +36,12 @@ import org.slf4j.LoggerFactory;
  * The detection registry.
  */
 public class DetectionRegistry {
-
+  // component type to component class name and annotation
   private static final Map<String, Map> REGISTRY_MAP = new HashMap<>();
+  // component class name to tuner annotation
+  private static final Map<String, Tune> TUNE_MAP = new HashMap<>();
+  // yaml pipeline type to yaml converter class name
+  private static final Map<String, String> YAML_MAP = new HashMap<>();
   private static final Logger LOG = LoggerFactory.getLogger(DetectionRegistry.class);
   private static final String KEY_CLASS_NAME = "className";
   private static final String KEY_ANNOTATION = "annotation";
@@ -47,48 +53,91 @@ public class DetectionRegistry {
     return INSTANCE;
   }
 
+  public static void registerComponent(String className, String type) {
+    REGISTRY_MAP.put(type, ImmutableMap.of(KEY_CLASS_NAME, className));
+  }
+
   /**
-   * Internal constructor. Read the Detection annotation from each stage implementation.
+   * Read all the components, tune, and yaml annotations and initialize the registry.
    */
-  private DetectionRegistry() {
+  public static void init() {
     try {
-      Set<ClassPath.ClassInfo> classInfos = ClassPath.from(Thread.currentThread().getContextClassLoader())
-          .getTopLevelClasses(AnomalyDetectionStage.class.getPackage().getName());
-      for (ClassPath.ClassInfo classInfo : classInfos) {
-        Class clazz = Class.forName(classInfo.getName());
+      Reflections reflections = new Reflections();
+      // register components
+      Set<Class<? extends BaseComponent>> classes = reflections.getSubTypesOf(BaseComponent.class);
+      for (Class clazz : classes) {
+        String className = clazz.getName();
         for (Annotation annotation : clazz.getAnnotations()) {
-          if (annotation instanceof Detection) {
-            Detection detectionAnnotation = (Detection) annotation;
-            REGISTRY_MAP.put(detectionAnnotation.type(), ImmutableMap.of(KEY_CLASS_NAME, classInfo.getName(), KEY_ANNOTATION, detectionAnnotation));
+          if (annotation instanceof Components) {
+            Components componentsAnnotation = (Components) annotation;
+            REGISTRY_MAP.put(componentsAnnotation.type(),
+                ImmutableMap.of(KEY_CLASS_NAME, className, KEY_ANNOTATION, componentsAnnotation));
+          }
+          if (annotation instanceof Tune) {
+            Tune trainingAnnotation = (Tune) annotation;
+            TUNE_MAP.put(className, trainingAnnotation);
+          }
+        }
+      }
+      // register yaml translators
+      Set<Class<? extends YamlDetectionConfigTranslator>> yamlConverterClasses =
+          reflections.getSubTypesOf(YamlDetectionConfigTranslator.class);
+      for (Class clazz : yamlConverterClasses) {
+        for (Annotation annotation : clazz.getAnnotations()) {
+          if (annotation instanceof Yaml) {
+            YAML_MAP.put(((Yaml) annotation).pipelineType(), clazz.getName());
           }
         }
       }
     } catch (Exception e) {
-      LOG.warn("Build detection registry error", e);
+      LOG.warn("initialize detection registry error", e);
     }
   }
 
   private static final DetectionRegistry INSTANCE = new DetectionRegistry();
 
   /**
-   * Look up the class name for a given algorithm
+   * Look up the class name for a given component
    * @param type the type used in the YAML configs
-   * @return algorithm class name
+   * @return component class name
    */
   public String lookup(String type) {
+    Preconditions.checkArgument(REGISTRY_MAP.containsKey(type.toUpperCase()), type + " not found in registry");
     return MapUtils.getString(REGISTRY_MAP.get(type.toUpperCase()), KEY_CLASS_NAME);
   }
 
   /**
-   * Return all stage implementation annotations
-   * @return List of detection annotation
+   * Look up the tunable class name for a component class name
+   * @return tunable class name
+   */
+  public String lookupTunable(String className) {
+    Preconditions.checkArgument(TUNE_MAP.containsKey(className), className + " not found in registry");
+    return TUNE_MAP.get(className).tunable();
+  }
+
+  /**
+   * Look up the yaml converter class name for a pipeline type
+   * @return yaml converter class name
+   */
+  public String lookupYamlConverter(String pipelineType) {
+    Preconditions.checkArgument(YAML_MAP.containsKey(pipelineType), pipelineType + " not found in registry");
+    return YAML_MAP.get(pipelineType);
+  }
+
+  public boolean isTunable(String className) {
+    return TUNE_MAP.containsKey(className);
+  }
+
+  /**
+   * Return all component implementation annotations
+   * @return List of component annotation
    */
-  public List<Detection> getAllAnnotation() {
-    List<Detection> annotations = new ArrayList<>();
-    for (Map.Entry<String, Map> entry : REGISTRY_MAP.entrySet()){
+  public List<Components> getAllAnnotation() {
+    List<Components> annotations = new ArrayList<>();
+    for (Map.Entry<String, Map> entry : REGISTRY_MAP.entrySet()) {
       Map infoMap = entry.getValue();
-      if (infoMap.containsKey(KEY_ANNOTATION)){
-        annotations.add((Detection) infoMap.get(KEY_ANNOTATION));
+      if (infoMap.containsKey(KEY_ANNOTATION)) {
+        annotations.add((Components) infoMap.get(KEY_ANNOTATION));
       }
     }
     return annotations;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionParam.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Param.java
similarity index 97%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionParam.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Param.java
index 1aea7e5..9307a33 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/DetectionParam.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Param.java
@@ -25,7 +25,7 @@ import java.lang.annotation.Target;
 
 @Target({ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD})
 @Retention(RetentionPolicy.RUNTIME)
-public @interface DetectionParam {
+public @interface Param {
 
   @JsonProperty String name() default "";
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Tune.java
similarity index 72%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Tune.java
index 0fd4537..47eea13 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Tune.java
@@ -24,22 +24,13 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 
+/**
+ * Tune annotation
+ * Components with this annotation will registered it's tunable and then this tunable could be called by yaml converter to generate the component spec.
+ */
 @Target({ElementType.TYPE})
 @Retention(RetentionPolicy.RUNTIME)
 @Inherited
-public @interface Detection {
-  @JsonProperty String name() default "";
-
-  @JsonProperty DetectionTag[] tags() default {};
-
-  @JsonProperty String type() default "";
-
-  @JsonProperty String description() default "";
-
-  @JsonProperty boolean hidden() default false;
-
-  @JsonProperty PresentationOption[] presentation() default {};
-
-  @JsonProperty DetectionParam[] params() default {};
+public @interface Tune {
+  @JsonProperty String tunable() default "";
 }
-
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Yaml.java
similarity index 72%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Yaml.java
index 0fd4537..7967180 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Detection.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/annotation/Yaml.java
@@ -24,22 +24,13 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 
+/**
+ * Yaml annotation
+ * Registers the yaml translator to the translator factory.
+ */
 @Target({ElementType.TYPE})
 @Retention(RetentionPolicy.RUNTIME)
 @Inherited
-public @interface Detection {
-  @JsonProperty String name() default "";
-
-  @JsonProperty DetectionTag[] tags() default {};
-
-  @JsonProperty String type() default "";
-
-  @JsonProperty String description() default "";
-
-  @JsonProperty boolean hidden() default false;
-
-  @JsonProperty PresentationOption[] presentation() default {};
-
-  @JsonProperty DetectionParam[] params() default {};
+public @interface Yaml {
+  @JsonProperty String pipelineType() default "";
 }
-
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProvider.java
deleted file mode 100644
index 00fa864..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProvider.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.linkedin.thirdeye.detection.baseline;
-
-import com.linkedin.thirdeye.dataframe.DataFrame;
-import com.linkedin.thirdeye.dataframe.util.MetricSlice;
-import com.linkedin.thirdeye.detection.DataProvider;
-import java.util.Collection;
-import java.util.Map;
-
-
-/**
- * The baseline provider in the detection framework. Lower level interface with data provider.
- */
-public interface BaselineProvider {
-  /**
-   * Initialize the baseline provider
-   * @param properties the properties of this baseline provider.
-   */
-  void init(Map<String, Object> properties);
-
-  /**
-   * Compute the baseline time series for the collection of metric slices.
-   * @param slices the metric slices
-   * @param provider the data source for time series, aggregates, anomalies, etc.
-   * @return the mapping of the metric slice to its time series data frame.
-   */
-  Map<MetricSlice, DataFrame> computeBaselineTimeSeries(Collection<MetricSlice> slices, DataProvider provider);
-
-  /**
-   * Compute the baseline time series for the collection of metric slices.
-   * @param slices the metric slices
-   * @param provider the data source for time series, aggregates, anomalies, etc.
-   * @return the mapping of the metric slice to its aggregate value.
-   */
-  Map<MetricSlice, Double> computeBaselineAggregates(Collection<MetricSlice> slices, DataProvider provider);
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/RuleBaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/RuleBaselineProvider.java
deleted file mode 100644
index 0a67a53..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/RuleBaselineProvider.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.linkedin.thirdeye.detection.baseline;
-
-import com.linkedin.thirdeye.dashboard.resources.v2.BaselineParsingUtils;
-import com.linkedin.thirdeye.dataframe.DataFrame;
-import com.linkedin.thirdeye.dataframe.LongSeries;
-import com.linkedin.thirdeye.dataframe.util.MetricSlice;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
-import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.MapUtils;
-
-import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-/**
- * Baseline provider for rule detection.
- * Supports woXw, meanXw, medianXw, etc.
- *
- * @see BaselineParsingUtils#parseOffset(String, String) for list of all supported offsets
- */
-public class RuleBaselineProvider extends StaticBaselineProvider {
-  private Baseline baseline;
-  private String timezone;
-  private String offset;
-
-  @Override
-  public void init(Map<String, Object> properties) {
-    super.init(properties);
-    this.offset = MapUtils.getString(properties, "offset", "current");
-    this.timezone = MapUtils.getString(properties, "timezone", "UTC");
-    this.baseline = BaselineParsingUtils.parseOffset(this.offset, this.timezone);
-  }
-
-  @Override
-  InputDataSpec getInputDataSpec(Collection<MetricSlice> slices) {
-    List<MetricSlice> timeSeriesSlices = new ArrayList<>();
-    for (MetricSlice slice : slices) {
-      timeSeriesSlices.addAll(this.baseline.scatter(slice));
-    }
-    return new InputDataSpec().withTimeseriesSlices(timeSeriesSlices);
-  }
-
-  @Override
-  public Map<MetricSlice, DataFrame> computeBaselineTimeSeries(Collection<MetricSlice> slices, InputData data) {
-    Map<MetricSlice, DataFrame> result = new HashMap<>();
-    for (MetricSlice slice : slices) {
-      result.put(slice, this.baseline.gather(slice, data.getTimeseries()));
-    }
-    return result;
-  }
-
-  @Override
-  InputDataSpec getAggregateInputDataSpec(Collection<MetricSlice> slices) {
-    List<MetricSlice> timeSeriesSlices = new ArrayList<>();
-    for (MetricSlice slice : slices) {
-      timeSeriesSlices.addAll(this.baseline.scatter(slice));
-    }
-    return new InputDataSpec().withAggregateSlices(timeSeriesSlices);
-  }
-
-  @Override
-  public Map<MetricSlice, Double> computeBaselineAggregates(Collection<MetricSlice> slices, InputData data) {
-    Map<MetricSlice, Double> result = new HashMap<>();
-    for (MetricSlice slice : slices) {
-      double value;
-      try {
-        value = data.getAggregates().get(this.baseline.scatter(slice).get(0)).getDouble(COL_VALUE, 0);
-      } catch (Exception e) {
-        value = Double.NaN;
-      }
-      result.put(slice, value);
-    }
-    return result;
-  }
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/StaticBaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/StaticBaselineProvider.java
deleted file mode 100644
index 9094000..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/StaticBaselineProvider.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.linkedin.thirdeye.detection.baseline;
-
-import com.linkedin.thirdeye.dataframe.DataFrame;
-import com.linkedin.thirdeye.dataframe.DoubleSeries;
-import com.linkedin.thirdeye.dataframe.Series;
-import com.linkedin.thirdeye.dataframe.util.MetricSlice;
-import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.InputData;
-import com.linkedin.thirdeye.detection.InputDataSpec;
-import com.linkedin.thirdeye.detection.algorithm.stage.StageUtils;
-import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.collections.MapUtils;
-
-import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-/**
- * The static baseline provider. Higher level interfaces.
- */
-public abstract class StaticBaselineProvider implements BaselineProvider {
-  Series.DoubleFunction aggregationFunction = DoubleSeries.MEAN;
-
-  @Override
-  public void init(Map<String, Object> properties) {
-    this.aggregationFunction = BaselineAggregateType.valueOf(MapUtils.getString(properties, "metricFunction", "MEAN")).getFunction();
-  }
-
-  /**
-   * compute baseline times eries for the metric slices.
-   * The DataFrame should includes one Long series called COL_TIME = "timestamp" and a double series called COL_VALUE = "value"
-   * @param slices slices to compute the baseline time series for.
-   * @param data input data as defined by getInputDataSpec method.
-   * @return the mapping of the metric slice to its time series data frame.
-   */
-  public abstract Map<MetricSlice, DataFrame> computeBaselineTimeSeries(Collection<MetricSlice> slices, InputData data);
-
-  /**
-   * The input data spec to describe what data fetch. The data will be feed in to the
-   * @see StaticBaselineProvider#computeBaselineTimeSeries method
-   * @param slices slices to compute the baseline time series for.
-   * @return the input data spec to describe what time series, anomalies, etc. to fetch.
-   */
-  abstract InputDataSpec getInputDataSpec(Collection<MetricSlice> slices);
-
-  /**
-   * To compute the baseline aggregate values for each metric slice. Optionally override this method.
-   * By default calls compute baseline time series then aggregate to one value based on the aggregation function.
-   * The input data are defined by
-   * @see StaticBaselineProvider#getAggregateInputDataSpec(Collection<MetricSlice>) method.
-   *
-   * @param slices the metric slices
-   * @param data input data as defined by getAggregateInputDataSpec method.
-   * @return the mapping of metric slice to its aggregate values
-   */
-  public Map<MetricSlice, Double> computeBaselineAggregates(Collection<MetricSlice> slices, InputData data) {
-    Map<MetricSlice, DataFrame> baselineTimeSeries = this.computeBaselineTimeSeries(slices, data);
-    Map<MetricSlice, Double> baselineAggregates = new HashMap<>();
-    for (MetricSlice slice : slices) {
-      DataFrame ts = baselineTimeSeries.get(slice);
-      baselineAggregates.put(slice, ts.getDoubles(COL_VALUE).aggregate(this.aggregationFunction).getDouble(0));
-    }
-    return baselineAggregates;
-  }
-
-  /**
-   * The input data spec to describe what data fetch. The data will be feed in to the
-   * @see StaticBaselineProvider#computeBaselineAggregates method.
-   * Optionally override this method. By default calls the
-   * @see StaticBaselineProvider#getInputDataSpec method to get a data spec.
-   * @param slices slices to compute the baseline time series for.
-   * @return the input data spec to describe what time series, anomalies, etc. for the framework to fetch.
-   */
-  InputDataSpec getAggregateInputDataSpec(Collection<MetricSlice> slices) {
-    return this.getInputDataSpec(slices);
-  }
-
-  @Override
-  public final Map<MetricSlice, DataFrame> computeBaselineTimeSeries(Collection<MetricSlice> slices, DataProvider provider) {
-    return this.computeBaselineTimeSeries(slices, StageUtils.getDataForSpec(provider, this.getInputDataSpec(slices)));
-  }
-
-  @Override
-  public final Map<MetricSlice, Double> computeBaselineAggregates(Collection<MetricSlice> slices, DataProvider provider) {
-    return this.computeBaselineAggregates(slices, StageUtils.getDataForSpec(provider, this.getAggregateInputDataSpec(slices)));
-  }
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/RuleBaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/RuleBaselineProvider.java
new file mode 100644
index 0000000..2e56b6a
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/RuleBaselineProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.components;
+
+import com.linkedin.thirdeye.dashboard.resources.v2.BaselineParsingUtils;
+import com.linkedin.thirdeye.dataframe.Series;
+import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.annotation.Components;
+import com.linkedin.thirdeye.detection.spec.RuleBaselineProviderSpec;
+import com.linkedin.thirdeye.detection.spi.components.BaselineProvider;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import com.linkedin.thirdeye.detection.spi.model.TimeSeries;
+import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
+
+import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
+
+@Components(title = "rule baseline",
+    type = "RULE_BASELINE"
+)
+public class RuleBaselineProvider implements BaselineProvider<RuleBaselineProviderSpec> {
+  private Baseline baseline;
+  private String timezone;
+  private String offset;
+  private InputDataFetcher dataFetcher;
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    InputData data = this.dataFetcher.fetchData(new InputDataSpec().withTimeseriesSlices(this.baseline.scatter(slice)));
+    return TimeSeries.fromDataFrame(this.baseline.gather(slice, data.getTimeseries()));
+  }
+
+  @Override
+  public Double computePredictedAggregates(MetricSlice slice, Series.DoubleFunction aggregateFunction) {
+    InputData data = this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(this.baseline.scatter(slice)));
+    double value;
+    try {
+      value = data.getAggregates().get(this.baseline.scatter(slice).get(0)).getDouble(COL_VALUE, 0);
+    } catch (Exception e) {
+      value = Double.NaN;
+    }
+    return value;
+  }
+
+
+  @Override
+  public void init(RuleBaselineProviderSpec spec, InputDataFetcher dataFetcher) {
+    this.offset = spec.getOffset();
+    this.timezone = spec.getTimezone();
+    this.baseline = BaselineParsingUtils.parseOffset(this.offset, this.timezone);
+    this.dataFetcher = dataFetcher;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
new file mode 100644
index 0000000..cc6f8cc
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.components;
+
+import com.linkedin.thirdeye.dataframe.DataFrame;
+import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.annotation.Components;
+import com.linkedin.thirdeye.detection.annotation.DetectionTag;
+import com.linkedin.thirdeye.detection.annotation.Param;
+import com.linkedin.thirdeye.detection.annotation.PresentationOption;
+import com.linkedin.thirdeye.detection.spec.ThresholdRuleFilterSpec;
+import com.linkedin.thirdeye.detection.spi.components.AnomalyFilter;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
+import java.util.Collections;
+import java.util.Map;
+
+import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+/**
+ * This threshold rule filter stage filters the anomalies if either the min or max thresholds do not pass.
+ */
+@Components(title = "Aggregate Threshold Filter", type = "THRESHOLD_RULE_FILTER", tags = {
+    DetectionTag.RULE_FILTER}, description = "Threshold rule filter. filters the anomalies if either the min or max thresholds do not satisfied.", presentation = {
+    @PresentationOption(name = "absolute value", description = "aggregated absolute value within a time period", template = "is between ${min} and ${max}")}, params = {
+    @Param(name = "min", placeholder = "value"), @Param(name = "max", placeholder = "value")})
+public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFilterSpec> {
+  private double min;
+  private double max;
+  private InputDataFetcher dataFetcher;
+
+  @Override
+  public boolean isQualified(MergedAnomalyResultDTO anomaly) {
+    MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
+    MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
+    InputData data = dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(Collections.singleton(currentSlice)));
+
+    Map<MetricSlice, DataFrame> aggregates = data.getAggregates();
+    double currentValue = getValueFromAggregates(currentSlice, aggregates);
+    if (!Double.isNaN(this.min) && currentValue < this.min) {
+      return false;
+    }
+    if (!Double.isNaN(this.max) && currentValue > this.max) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void init(ThresholdRuleFilterSpec spec, InputDataFetcher dataFetcher) {
+    this.min = spec.getMin();
+    this.max = spec.getMax();
+    this.dataFetcher = dataFetcher;
+  }
+
+  double getValueFromAggregates(MetricSlice slice, Map<MetricSlice, DataFrame> aggregates) {
+    return aggregates.get(slice).getDouble(COL_VALUE, 0);
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/ThresholdRuleDetector.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/ThresholdRuleDetector.java
new file mode 100644
index 0000000..6ca9f0c
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/ThresholdRuleDetector.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.components;
+
+import com.linkedin.thirdeye.dataframe.BooleanSeries;
+import com.linkedin.thirdeye.dataframe.DataFrame;
+import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.annotation.Components;
+import com.linkedin.thirdeye.detection.annotation.DetectionTag;
+import com.linkedin.thirdeye.detection.annotation.Param;
+import com.linkedin.thirdeye.detection.annotation.PresentationOption;
+import com.linkedin.thirdeye.detection.spec.ThresholdRuleDetectorSpec;
+import com.linkedin.thirdeye.detection.spi.components.AnomalyDetector;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import com.linkedin.thirdeye.detection.DetectionUtils;
+import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
+import java.util.Collections;
+import java.util.List;
+import org.joda.time.Interval;
+
+import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+@Components(title = "Threshold", type = "THRESHOLD", tags = {
+    DetectionTag.RULE_DETECTION}, description = "Simple threshold rule algorithm with (optional) upper and lower bounds on a metric value.", presentation = {
+    @PresentationOption(name = "absolute value", description = "aggregated absolute value within a time period", template = "is lower than ${min} or higher than ${max}")}, params = {
+    @Param(name = "min", placeholder = "value"), @Param(name = "max", placeholder = "value")})
+public class ThresholdRuleDetector implements AnomalyDetector<ThresholdRuleDetectorSpec> {
+  private final String COL_TOO_HIGH = "tooHigh";
+  private final String COL_TOO_LOW = "tooLow";
+  private final String COL_ANOMALY = "anomaly";
+
+  private double min;
+  private double max;
+  private InputDataFetcher dataFetcher;
+
+  @Override
+  public List<MergedAnomalyResultDTO> runDetection(Interval window, String metricUrn) {
+    MetricEntity me = MetricEntity.fromURN(metricUrn);
+    Long endTime = window.getEndMillis();
+    MetricSlice slice = MetricSlice.from(me.getId(), window.getStartMillis(), endTime, me.getFilters());
+
+    InputData data = this.dataFetcher.fetchData(
+        new InputDataSpec().withTimeseriesSlices(Collections.singletonList(slice))
+            .withMetricIdsForDataset(Collections.singletonList(me.getId())));
+    DataFrame df = data.getTimeseries().get(slice);
+
+    // defaults
+    df.addSeries(COL_TOO_HIGH, BooleanSeries.fillValues(df.size(), false));
+    df.addSeries(COL_TOO_LOW, BooleanSeries.fillValues(df.size(), false));
+
+    // max
+    if (!Double.isNaN(this.max)) {
+      df.addSeries(COL_TOO_HIGH, df.getDoubles(COL_VALUE).gt(this.max));
+    }
+
+    // min
+    if (!Double.isNaN(this.min)) {
+      df.addSeries(COL_TOO_LOW, df.getDoubles(COL_VALUE).lt(this.min));
+    }
+
+    df.mapInPlace(BooleanSeries.HAS_TRUE, COL_ANOMALY, COL_TOO_HIGH, COL_TOO_LOW);
+
+    DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+    return DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY, endTime, datasetConfig);
+  }
+
+  @Override
+  public void init(ThresholdRuleDetectorSpec spec, InputDataFetcher dataFetcher) {
+    this.min = spec.getMin();
+    this.max = spec.getMax();
+    this.dataFetcher = dataFetcher;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/GridSearchTuningAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/GridSearchTuningAlgorithm.java
index d4699b0..f704fa6 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/GridSearchTuningAlgorithm.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/GridSearchTuningAlgorithm.java
@@ -33,7 +33,7 @@ import com.linkedin.thirdeye.datasource.loader.AggregationLoader;
 import com.linkedin.thirdeye.datasource.loader.DefaultAggregationLoader;
 import com.linkedin.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
 import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DefaultDataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
@@ -99,8 +99,8 @@ public class GridSearchTuningAlgorithm implements TuningAlgorithm {
    * @throws Exception the exception
    */
   @Override
-  public void fit(AnomalySlice slice) throws Exception {
-    Collection<MergedAnomalyResultDTO> testAnomalies = this.provider.fetchAnomalies(Collections.singletonList(slice)).get(slice);
+  public void fit(AnomalySlice slice, long configId) throws Exception {
+    Collection<MergedAnomalyResultDTO> testAnomalies = this.provider.fetchAnomalies(Collections.singletonList(slice), configId).get(slice);
     fit(slice, new HashMap<String, Number>(), testAnomalies);
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/TuningAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/TuningAlgorithm.java
index d31bce1..9bb5ab8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/TuningAlgorithm.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/TuningAlgorithm.java
@@ -17,7 +17,7 @@
 package com.linkedin.thirdeye.detection.finetune;
 
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 
 
 /**
@@ -30,7 +30,7 @@ public interface TuningAlgorithm {
    * @param slice anomaly slice
    * @throws Exception the exception
    */
-  void fit(AnomalySlice slice) throws Exception;
+  void fit(AnomalySlice slice, long configId) throws Exception;
 
   /**
    * Return the best detection config detection config dto.
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProviderLoader.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/AbstractSpec.java
similarity index 53%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProviderLoader.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/AbstractSpec.java
index bba334a..015dfeb 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProviderLoader.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/AbstractSpec.java
@@ -14,21 +14,20 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.baseline;
+package com.linkedin.thirdeye.detection.spec;
 
-import java.lang.reflect.Constructor;
 import java.util.Map;
+import org.modelmapper.ModelMapper;
 
 
-public class BaselineProviderLoader {
-  private static final String PROP_CLASS_NAME = "className";
+/**
+ * Base class for component specs
+ */
+public abstract class AbstractSpec {
 
-  public static BaselineProvider from(Map<String, Object> properties) throws Exception {
-    String className = properties.get(PROP_CLASS_NAME).toString();
-    Constructor<?>
-        constructor = Class.forName(className).getConstructor();
-    BaselineProvider baselineProvider = (BaselineProvider) constructor.newInstance();
-    baselineProvider.init(properties);
-    return baselineProvider;
+  public static <T extends AbstractSpec> T fromProperties(Map<String, Object> properties, Class<T> specClass) {
+    // don't reuse model mapper instance. It caches typeMaps and will result in unexpected mappings
+    ModelMapper modelMapper = new ModelMapper();
+    return modelMapper.map(properties, specClass);
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/RuleBaselineProviderSpec.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/RuleBaselineProviderSpec.java
new file mode 100644
index 0000000..9327727
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/RuleBaselineProviderSpec.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.spec;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RuleBaselineProviderSpec extends AbstractSpec{
+  private String timezone = "UTC";
+  private String offset = "wo1w";
+
+  public RuleBaselineProviderSpec(String timezone, String offset) {
+    this.timezone = timezone;
+    this.offset = offset;
+  }
+
+  public RuleBaselineProviderSpec() {
+  }
+
+  public String getTimezone() {
+    return timezone;
+  }
+
+  public void setTimezone(String timezone) {
+    this.timezone = timezone;
+  }
+
+  public String getOffset() {
+    return offset;
+  }
+
+  public void setOffset(String offset) {
+    this.offset = offset;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/ThresholdRuleDetectorSpec.java
similarity index 56%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/ThresholdRuleDetectorSpec.java
index 8562868..0eacb6a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/ThresholdRuleDetectorSpec.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,11 +14,29 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.datalayer.dto;
+package com.linkedin.thirdeye.detection.spec;
 
-import com.linkedin.thirdeye.datalayer.pojo.DetectionConfigBean;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 
 
-public class DetectionConfigDTO extends DetectionConfigBean {
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ThresholdRuleDetectorSpec extends AbstractSpec {
+  private double min = Double.NaN;
+  private double max = Double.NaN;
 
+  public double getMin() {
+    return min;
+  }
+
+  public void setMin(double min) {
+    this.min = min;
+  }
+
+  public double getMax() {
+    return max;
+  }
+
+  public void setMax(double max) {
+    this.max = max;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
similarity index 56%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
index 8562868..8b975d8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,11 +14,29 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.datalayer.dto;
+package com.linkedin.thirdeye.detection.spec;
 
-import com.linkedin.thirdeye.datalayer.pojo.DetectionConfigBean;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 
 
-public class DetectionConfigDTO extends DetectionConfigBean {
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ThresholdRuleFilterSpec extends AbstractSpec {
+  private double min = Double.NaN;
+  private double max = Double.NaN;
 
+  public double getMin() {
+    return min;
+  }
+
+  public void setMin(double min) {
+    this.min = min;
+  }
+
+  public double getMax() {
+    return max;
+  }
+
+  public void setMax(double max) {
+    this.max = max;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/TuningAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/AnomalyDetector.java
similarity index 50%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/TuningAlgorithm.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/AnomalyDetector.java
index d31bce1..d558a58 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/finetune/TuningAlgorithm.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/AnomalyDetector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,28 +14,21 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.finetune;
+package com.linkedin.thirdeye.detection.spi.components;
 
-import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.spec.AbstractSpec;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import java.util.List;
+import org.joda.time.Interval;
 
 
-/**
- * The interface Tuning algorithm.
- */
-public interface TuningAlgorithm {
+public interface AnomalyDetector<T extends AbstractSpec> extends BaseComponent<T> {
   /**
-   * Fit the time series and anomalies between start and end time stamps, and score the detection configs.
-   *
-   * @param slice anomaly slice
-   * @throws Exception the exception
+   * Run detection in the specified time range and return a list of anomalies
+   * @return list of anomalies
    */
-  void fit(AnomalySlice slice) throws Exception;
+  List<MergedAnomalyResultDTO> runDetection(Interval window, String metricUrn);
 
-  /**
-   * Return the best detection config detection config dto.
-   *
-   * @return the detection config dto
-   */
-  DetectionConfigDTO bestDetectionConfig();
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/AnomalyFilter.java
similarity index 57%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/AnomalyFilter.java
index 8562868..64e1b65 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/AnomalyFilter.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,11 +14,16 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.datalayer.dto;
+package com.linkedin.thirdeye.detection.spi.components;
 
-import com.linkedin.thirdeye.datalayer.pojo.DetectionConfigBean;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.spec.AbstractSpec;
 
 
-public class DetectionConfigDTO extends DetectionConfigBean {
-
+public interface AnomalyFilter<T extends AbstractSpec> extends BaseComponent<T> {
+  /**
+   * Check if an anomaly is qualified to pass the filter
+   * @return a boolean value to suggest if the anomaly should be filtered
+   */
+  boolean isQualified(MergedAnomalyResultDTO anomaly);
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/BaseComponent.java
similarity index 69%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/BaseComponent.java
index 8562868..63de280 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/BaseComponent.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,11 +14,12 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.datalayer.dto;
+package com.linkedin.thirdeye.detection.spi.components;
 
-import com.linkedin.thirdeye.datalayer.pojo.DetectionConfigBean;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.spec.AbstractSpec;
 
 
-public class DetectionConfigDTO extends DetectionConfigBean {
-
+public interface BaseComponent<T extends AbstractSpec> {
+  void init(T spec, InputDataFetcher dataFetcher);
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/BaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/BaselineProvider.java
new file mode 100644
index 0000000..dd09b1a
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/BaselineProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.spi.components;
+
+import com.linkedin.thirdeye.dataframe.Series;
+import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.detection.spec.AbstractSpec;
+import com.linkedin.thirdeye.detection.spi.model.TimeSeries;
+
+
+/**
+ * The baseline provider to calculate predicted baseline.
+ */
+public interface BaselineProvider<T extends AbstractSpec> extends BaseComponent<T> {
+  /**
+   * Compute the baseline time series for the metric slice.
+   * @return the time series contains predicted baseline.
+   */
+  TimeSeries computePredictedTimeSeries(MetricSlice slice);
+
+  /**
+   * Compute the baseline time series for the metric slice.
+   * default implementation is to call computePredictedTimeSeries and aggregate using the aggregate function
+   * @return the predicted value.
+   */
+  default Double computePredictedAggregates(MetricSlice slice, Series.DoubleFunction aggregateFunction){
+    TimeSeries baselineTimeSeries = this.computePredictedTimeSeries(slice);
+    return baselineTimeSeries.getPredictedBaseline().aggregate(aggregateFunction).getDouble(0);
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Grouper.java
similarity index 52%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Grouper.java
index 8562868..0957e99 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/dto/DetectionConfigDTO.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Grouper.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,11 +14,21 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.datalayer.dto;
+package com.linkedin.thirdeye.detection.spi.components;
 
-import com.linkedin.thirdeye.datalayer.pojo.DetectionConfigBean;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.spec.AbstractSpec;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import java.util.List;
 
 
-public class DetectionConfigDTO extends DetectionConfigBean {
+public interface Grouper<T extends AbstractSpec> extends BaseComponent<T> {
+  /**
+   * group anomalies.
+   *
+   * @return list of anomalies, with grouped dimensions
+   */
+  List<MergedAnomalyResultDTO> group(List<MergedAnomalyResultDTO> anomalies);
 
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProviderLoader.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
similarity index 50%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProviderLoader.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
index bba334a..780cb2f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/baseline/BaselineProviderLoader.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
@@ -14,21 +14,22 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.baseline;
+package com.linkedin.thirdeye.detection.spi.components;
 
-import java.lang.reflect.Constructor;
+import com.linkedin.thirdeye.detection.spec.AbstractSpec;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
+import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import java.util.Map;
+import org.joda.time.Interval;
 
-
-public class BaselineProviderLoader {
-  private static final String PROP_CLASS_NAME = "className";
-
-  public static BaselineProvider from(Map<String, Object> properties) throws Exception {
-    String className = properties.get(PROP_CLASS_NAME).toString();
-    Constructor<?>
-        constructor = Class.forName(className).getConstructor();
-    BaselineProvider baselineProvider = (BaselineProvider) constructor.newInstance();
-    baselineProvider.init(properties);
-    return baselineProvider;
-  }
+/**
+ * The tunable. For tuning specs of each component.
+ */
+public interface Tunable<T extends AbstractSpec> extends BaseComponent<T> {
+  /**
+   * Returns the new spec for the component it's tuning
+   * @param currentSpec current spec for the component. empty if not exist
+   * @return input data spec
+   */
+  Map<String, Object> tune(Map<String, Object> currentSpec, Interval trainingWindow);
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/AnomalySlice.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/AnomalySlice.java
similarity index 69%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/AnomalySlice.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/AnomalySlice.java
index 9b60d2f..3ea56a1 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/AnomalySlice.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/AnomalySlice.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection;
+package com.linkedin.thirdeye.detection.spi.model;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
@@ -26,29 +26,22 @@ import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
  * dimension filters.
  */
 public class AnomalySlice {
-  final long configId;
   final long start;
   final long end;
   final Multimap<String, String> filters;
 
   public AnomalySlice() {
-    this.configId = -1;
     this.start = -1;
     this.end = -1;
     this.filters = ArrayListMultimap.create();
   }
 
-  public AnomalySlice(long configId, long start, long end, Multimap<String, String> filters) {
-    this.configId = configId;
+  public AnomalySlice(long start, long end, Multimap<String, String> filters) {
     this.start = start;
     this.end = end;
     this.filters = filters;
   }
 
-  public long getConfigId() {
-    return configId;
-  }
-
   public long getStart() {
     return start;
   }
@@ -61,24 +54,16 @@ public class AnomalySlice {
     return filters;
   }
 
-  public AnomalySlice withConfigId(long configId) {
-    return new AnomalySlice(configId, this.start, this.end, this.filters);
-  }
-
-  public AnomalySlice withConfigId(Long configId) {
-    return new AnomalySlice(configId != null ? configId : -1, this.start, this.end, this.filters);
-  }
-
   public AnomalySlice withStart(long start) {
-    return new AnomalySlice(this.configId, start, this.end, this.filters);
+    return new AnomalySlice(start, this.end, this.filters);
   }
 
   public AnomalySlice withEnd(long end) {
-    return new AnomalySlice(this.configId, this.start, end, this.filters);
+    return new AnomalySlice(this.start, end, this.filters);
   }
 
   public AnomalySlice withFilters(Multimap<String, String> filters) {
-    return new AnomalySlice(this.configId, this.start, this.end, filters);
+    return new AnomalySlice(this.start, this.end, filters);
   }
 
   public boolean match(MergedAnomalyResultDTO anomaly) {
@@ -86,8 +71,6 @@ public class AnomalySlice {
       return false;
     if (this.end >= 0 && anomaly.getStartTime() >= this.end)
       return false;
-    if (this.configId >= 0 && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != this.configId))
-      return false;
 
     for (String dimName : this.filters.keySet()) {
       if (anomaly.getDimensions().containsKey(dimName)) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/EventSlice.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/EventSlice.java
similarity index 97%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/EventSlice.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/EventSlice.java
index d6ee907..eda21f9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/EventSlice.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/EventSlice.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection;
+package com.linkedin.thirdeye.detection.spi.model;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputData.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
similarity index 55%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputData.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
index b226653..adb51cc 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputData.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,13 +14,16 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection;
+package com.linkedin.thirdeye.detection.spi.model;
 
 import com.google.common.collect.Multimap;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.EventDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import java.util.Collections;
 import java.util.Map;
 
 
@@ -33,15 +36,33 @@ public class InputData {
   final Map<MetricSlice, DataFrame> aggregates;
   final Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies;
   final Multimap<EventSlice, EventDTO> events;
+  final Map<Long, MetricConfigDTO> metrics;
+  final Map<String, DatasetConfigDTO> datasets;
+  final Map<Long, DatasetConfigDTO> datasetForMetricId;
+
+  public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries, Map<MetricSlice, DataFrame> aggregates,
+      Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies, Multimap<EventSlice, EventDTO> events) {
+    this.dataSpec = spec;
+    this.timeseries = timeseries;
+    this.aggregates = aggregates;
+    this.anomalies = anomalies;
+    this.events = events;
+    this.metrics = Collections.emptyMap();
+    this.datasets = Collections.emptyMap();
+    this.datasetForMetricId = Collections.emptyMap();
+  }
 
-  public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries,
-      Map<MetricSlice, DataFrame> aggregates, Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies,
-      Multimap<EventSlice, EventDTO> events) {
+  public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries, Map<MetricSlice, DataFrame> aggregates,
+      Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies, Multimap<EventSlice, EventDTO> events,
+      Map<Long, MetricConfigDTO> metrics, Map<String, DatasetConfigDTO> datasets, Map<Long, DatasetConfigDTO> datasetForMetricId) {
     this.dataSpec = spec;
     this.timeseries = timeseries;
     this.aggregates = aggregates;
     this.anomalies = anomalies;
     this.events = events;
+    this.metrics = metrics;
+    this.datasets = datasets;
+    this.datasetForMetricId = datasetForMetricId;
   }
 
   public InputDataSpec getDataSpec() {
@@ -63,4 +84,16 @@ public class InputData {
   public Multimap<EventSlice, EventDTO> getEvents() {
     return events;
   }
+
+  public Map<Long, MetricConfigDTO> getMetrics() {
+    return metrics;
+  }
+
+  public Map<String, DatasetConfigDTO> getDatasets() {
+    return datasets;
+  }
+
+  public Map<Long, DatasetConfigDTO> getDatasetForMetricId(){
+    return datasetForMetricId;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputDataSpec.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
similarity index 60%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputDataSpec.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
index 1171338..9bd4304 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/InputDataSpec.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection;
+package com.linkedin.thirdeye.detection.spi.model;
 
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import java.util.Collection;
@@ -49,19 +49,41 @@ public class InputDataSpec {
   */
   final Collection<EventSlice> eventSlices;
 
+  /*
+    Metric ids to fetch the MetricConfigDTO for.
+   */
+  final Collection<Long> metricIds;
+
+  /*
+    Dataset names to fetch the DatasetConfigDTO for.
+  */
+  final Collection<String> datasetNames;
+
+  /*
+    Metric ids to fetch the DatasetConfigDTO for.
+  */
+  final Collection<Long> metricIdsForDatasets;
+
   public InputDataSpec() {
     this.timeseriesSlices = Collections.emptyList();
     this.aggregateSlices = Collections.emptyList();
     this.anomalySlices = Collections.emptyList();
     this.eventSlices = Collections.emptyList();
+    this.metricIds = Collections.emptyList();
+    this.datasetNames = Collections.emptyList();
+    this.metricIdsForDatasets = Collections.emptyList();
   }
 
   public InputDataSpec(Collection<MetricSlice> timeseriesSlices, Collection<MetricSlice> aggregateSlices,
-      Collection<AnomalySlice> anomalySlices, Collection<EventSlice> eventSlices) {
+      Collection<AnomalySlice> anomalySlices, Collection<EventSlice> eventSlices, Collection<Long> metricIds, Collection<String> datasetNames,
+      Collection<Long> metricIdsForDatasets) {
     this.timeseriesSlices = timeseriesSlices;
     this.aggregateSlices = aggregateSlices;
     this.anomalySlices = anomalySlices;
     this.eventSlices = eventSlices;
+    this.metricIds = metricIds;
+    this.datasetNames = datasetNames;
+    this.metricIdsForDatasets = metricIdsForDatasets;
   }
 
   public Collection<MetricSlice> getTimeseriesSlices() {
@@ -80,19 +102,44 @@ public class InputDataSpec {
     return eventSlices;
   }
 
+  public Collection<Long> getMetricIds() {
+    return metricIds;
+  }
+
+  public Collection<String> getDatasetNames() {
+    return datasetNames;
+  }
+
+  public Collection<Long> getMetricIdsForDatasets() {
+    return metricIdsForDatasets;
+  }
+
   public InputDataSpec withTimeseriesSlices(Collection<MetricSlice> timeseriesSlices) {
-    return new InputDataSpec(timeseriesSlices, this.aggregateSlices, this.anomalySlices, this.eventSlices);
+    return new InputDataSpec(timeseriesSlices, this.aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
   }
 
   public InputDataSpec withAggregateSlices(Collection<MetricSlice> aggregateSlices) {
-    return new InputDataSpec(this.timeseriesSlices, aggregateSlices, this.anomalySlices, this.eventSlices);
+    return new InputDataSpec(this.timeseriesSlices, aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
   }
 
   public InputDataSpec withAnomalySlices(Collection<AnomalySlice> anomalySlices) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, anomalySlices, this.eventSlices);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
   }
 
   public InputDataSpec withEventSlices(Collection<EventSlice> eventSlices) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+  }
+
+  public InputDataSpec withMetricIds(Collection<Long> metricIds) {
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, metricIds, this.datasetNames, this.metricIdsForDatasets);
+  }
+
+  public InputDataSpec withDatasetNames(Collection<String> datasetNames) {
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, datasetNames, this.metricIdsForDatasets);
+  }
+
+  public InputDataSpec withMetricIdsForDataset(Collection<Long> metricIdsForDatasets) {
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, metricIdsForDatasets);
+
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/TimeSeries.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/TimeSeries.java
new file mode 100644
index 0000000..bb7e127
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/TimeSeries.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.spi.model;
+
+import com.linkedin.thirdeye.dataframe.DataFrame;
+import com.linkedin.thirdeye.dataframe.DoubleSeries;
+import com.linkedin.thirdeye.dataframe.LongSeries;
+import com.linkedin.thirdeye.dataframe.util.DataFrameUtils;
+
+/**
+ * Time series. wrapper object of data frame. Used by baselineProvider to return the predicted time series
+ */
+public class TimeSeries {
+  private DataFrame df;
+
+  public TimeSeries() {
+    this.df = new DataFrame();
+  }
+
+  /**
+   * Add the time stamps into the timeseries
+   * @param timestamps
+   */
+  public void addTimeStamps(LongSeries timestamps) {
+    this.df.addSeries(DataFrameUtils.COL_TIME, timestamps);
+  }
+
+  /**
+   * Add the predicted baseline into the timeseries
+   * @param baselineValues predicted baseline values
+   */
+  public void addPredictedBaseline(DoubleSeries baselineValues) {
+    this.df.addSeries(DataFrameUtils.COL_VALUE, baselineValues);
+  }
+
+  public static TimeSeries fromDataFrame(DataFrame df) {
+    TimeSeries ts = new TimeSeries();
+    ts.df.addSeries(DataFrameUtils.COL_TIME, df.get(DataFrameUtils.COL_TIME));
+    ts.df.addSeries(DataFrameUtils.COL_VALUE, df.get(DataFrameUtils.COL_VALUE));
+    return ts;
+  }
+
+  public DoubleSeries getPredictedBaseline() {
+    return this.df.getDoubles(DataFrameUtils.COL_VALUE);
+  }
+
+  public DataFrame getDataFrame() {
+    return df;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyDetectionStageWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
similarity index 78%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyDetectionStageWrapper.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 9646de4..45d8270 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyDetectionStageWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.algorithm.stage;
+package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.base.Preconditions;
 import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
@@ -27,6 +27,8 @@ import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
+import com.linkedin.thirdeye.detection.DetectionUtils;
+import com.linkedin.thirdeye.detection.spi.components.AnomalyDetector;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.util.ThirdEyeUtils;
 import java.util.ArrayList;
@@ -44,13 +46,11 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Anomaly Detection Stage Wrapper. This wrapper runs a anomaly detection stage and return the anomalies.
+ * Anomaly Detector Wrapper. This wrapper runs a anomaly detector and return the anomalies.
  * Optionally set the detection window to be moving window fashion. This wrapper will call detection multiple times with
  * sliding window. Each sliding window start time and end time is aligned to the data granularity. Each window size is set by the spec.
  */
-public class AnomalyDetectionStageWrapper extends DetectionPipeline {
-  private static final String PROP_STAGE_CLASSNAME = "stageClassName";
-  private static final String PROP_SPECS = "specs";
+public class AnomalyDetectorWrapper extends DetectionPipeline {
   private static final String PROP_METRIC_URN = "metricUrn";
 
   // moving window detection properties
@@ -60,12 +60,12 @@ public class AnomalyDetectionStageWrapper extends DetectionPipeline {
   private static final String PROP_WINDOW_SIZE = "windowSize";
   private static final String PROP_WINDOW_UNIT = "windowUnit";
   private static final String PROP_FREQUENCY = "frequency";
-
-  private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionStageWrapper.class);
+  private static final String PROP_DETECTOR = "detector";
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AnomalyDetectorWrapper.class);
 
   private final String metricUrn;
-  private final Map<String, Object> specs;
-  private final String stageClassName;
+  private final AnomalyDetector anomalyDetector;
 
   private final int windowDelay;
   private final TimeUnit windowDelayUnit;
@@ -77,27 +77,24 @@ public class AnomalyDetectionStageWrapper extends DetectionPipeline {
   // need to specify run frequency for minute level detection. Used for moving monitoring window alignment, default to be 15 minutes.
   private final TimeGranularity functionFrequency;
 
-  public AnomalyDetectionStageWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
+  public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
     super(provider, config, startTime, endTime);
 
-    Map<String, Object> properties = config.getProperties();
-    Preconditions.checkArgument(properties.containsKey(PROP_STAGE_CLASSNAME), "Missing " + PROP_STAGE_CLASSNAME);
-
-    this.specs = MapUtils.getMap(properties, PROP_SPECS);
-    this.stageClassName = MapUtils.getString(properties, PROP_STAGE_CLASSNAME);
     this.metricUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
-    if (this.metricUrn != null) {
-      this.specs.put(PROP_METRIC_URN, metricUrn);
-    }
 
-    this.isMovingWindowDetection = MapUtils.getBooleanValue(this.specs, PROP_MOVING_WINDOW_DETECTION, false);
+    Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_DETECTOR));
+    String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_DETECTOR));
+    Preconditions.checkArgument(this.config.getComponents().containsKey(detectorReferenceKey));
+    this.anomalyDetector = (AnomalyDetector) this.config.getComponents().get(detectorReferenceKey);
+
+    this.isMovingWindowDetection = MapUtils.getBooleanValue(config.getProperties(), PROP_MOVING_WINDOW_DETECTION, false);
     // delays to wait for data becomes available
-    this.windowDelay = MapUtils.getIntValue(this.specs, PROP_WINDOW_DELAY, 0);
-    this.windowDelayUnit = TimeUnit.valueOf(MapUtils.getString(this.specs, PROP_WINDOW_DELAY_UNIT, "DAYS"));
+    this.windowDelay = MapUtils.getIntValue(config.getProperties(), PROP_WINDOW_DELAY, 0);
+    this.windowDelayUnit = TimeUnit.valueOf(MapUtils.getString(config.getProperties(), PROP_WINDOW_DELAY_UNIT, "DAYS"));
     // detection window size
-    this.windowSize = MapUtils.getIntValue(this.specs, PROP_WINDOW_SIZE, 1);
-    this.windowUnit = TimeUnit.valueOf(MapUtils.getString(this.specs, PROP_WINDOW_UNIT, "DAYS"));
-    Map<String, Object> frequency = MapUtils.getMap(this.specs, PROP_FREQUENCY, Collections.emptyMap());
+    this.windowSize = MapUtils.getIntValue(config.getProperties(), PROP_WINDOW_SIZE, 1);
+    this.windowUnit = TimeUnit.valueOf(MapUtils.getString(config.getProperties(), PROP_WINDOW_UNIT, "DAYS"));
+    Map<String, Object> frequency = MapUtils.getMap(config.getProperties(), PROP_FREQUENCY, Collections.emptyMap());
     this.functionFrequency = new TimeGranularity(MapUtils.getIntValue(frequency, "size", 15), TimeUnit.valueOf(MapUtils.getString(frequency, "unit", "MINUTES")));
   }
 
@@ -106,9 +103,7 @@ public class AnomalyDetectionStageWrapper extends DetectionPipeline {
     List<Interval> monitoringWindows = this.getMonitoringWindows();
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
     for (Interval window : monitoringWindows) {
-      AnomalyDetectionStage anomalyDetectionStage = this.loadAnomalyDetectorStage(this.stageClassName);
-      anomalyDetectionStage.init(specs, config.getId(), window.getStartMillis(), window.getEndMillis());
-      anomalies.addAll(anomalyDetectionStage.runDetection(this.provider));
+      anomalies.addAll(anomalyDetector.runDetection(window, this.metricUrn));
     }
 
     MetricEntity me = MetricEntity.fromURN(this.metricUrn);
@@ -119,15 +114,11 @@ public class AnomalyDetectionStageWrapper extends DetectionPipeline {
       anomaly.setMetricUrn(this.metricUrn);
       anomaly.setMetric(metric.getName());
       anomaly.setCollection(metric.getDataset());
-      anomaly.setDimensions(StageUtils.toFilterMap(me.getFilters()));
+      anomaly.setDimensions(DetectionUtils.toFilterMap(me.getFilters()));
     }
     return new DetectionPipelineResult(anomalies);
   }
 
-  private AnomalyDetectionStage loadAnomalyDetectorStage(String className) throws Exception {
-    return (AnomalyDetectionStage) Class.forName(className).newInstance();
-  }
-
   List<Interval> getMonitoringWindows() {
     if (this.isMovingWindowDetection) {
       try{
@@ -142,7 +133,7 @@ public class AnomalyDetectionStageWrapper extends DetectionPipeline {
         for (long monitoringEndTime : monitoringWindowEndTimes) {
           long endTime = monitoringEndTime - TimeUnit.MILLISECONDS.convert(windowDelay, windowDelayUnit);
           long startTime = endTime - TimeUnit.MILLISECONDS.convert(windowSize, windowUnit);
-          monitoringWindows.add(new Interval(startTime, endTime));
+          monitoringWindows.add(new Interval(startTime, endTime, dateTimeZone));
         }
         return monitoringWindows;
       } catch (Exception e) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyFilterStageWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyFilterWrapper.java
similarity index 67%
copy from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyFilterStageWrapper.java
copy to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyFilterWrapper.java
index 2eb8a8d..dba5f80 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/stage/AnomalyFilterStageWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyFilterWrapper.java
@@ -14,49 +14,49 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.algorithm.stage;
+package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.linkedin.thirdeye.datalayer.util.StringUtils;
 import com.linkedin.thirdeye.detection.ConfigUtils;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
+import com.linkedin.thirdeye.detection.DetectionUtils;
+import com.linkedin.thirdeye.detection.spi.components.AnomalyFilter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.commons.collections.MapUtils;
 
 
 /**
- * This anomaly filter wrapper allows user to plug in filter rules in the detection pipeline.
+ * This anomaly filter wrapper runs the anomaly filter component to filter anomalies generated by detector based on the filter implementation.
  */
-public class AnomalyFilterStageWrapper extends DetectionPipeline {
+public class AnomalyFilterWrapper extends DetectionPipeline {
   private static final String PROP_NESTED = "nested";
   private static final String PROP_CLASS_NAME = "className";
-  private static final String PROP_STAGE_CLASSNAME = "stageClassName";
-  private static final String PROP_SPECS = "specs";
   private static final String PROP_METRIC_URN = "metricUrn";
+  private static final String PROP_FILTER = "filter";
 
   private final List<Map<String, Object>> nestedProperties;
-  private final AnomalyFilterStage anomalyFilter;
+  private final AnomalyFilter anomalyFilter;
   private String metricUrn;
 
-  public AnomalyFilterStageWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
+  public AnomalyFilterWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
       throws Exception {
     super(provider, config, startTime, endTime);
     Map<String, Object> properties = config.getProperties();
     this.nestedProperties = ConfigUtils.getList(properties.get(PROP_NESTED));
-    Preconditions.checkArgument(properties.containsKey(PROP_STAGE_CLASSNAME), "Missing " + PROP_STAGE_CLASSNAME);
 
-    this.anomalyFilter = loadAnomalyFilterStage(MapUtils.getString(properties, PROP_STAGE_CLASSNAME));
-    this.anomalyFilter.init(MapUtils.getMap(properties, PROP_SPECS), config.getId(), startTime, endTime);
+    Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_FILTER));
+    String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_FILTER));
+    Preconditions.checkArgument(this.config.getComponents().containsKey(detectorReferenceKey));
+    this.anomalyFilter = (AnomalyFilter) this.config.getComponents().get(detectorReferenceKey);
+
     this.metricUrn = MapUtils.getString(properties, PROP_METRIC_URN);
   }
 
@@ -75,6 +75,7 @@ public class AnomalyFilterStageWrapper extends DetectionPipeline {
       nestedConfig.setId(this.config.getId());
       nestedConfig.setName(this.config.getName());
       nestedConfig.setProperties(properties);
+      nestedConfig.setComponents(this.config.getComponents());
       if (this.metricUrn != null){
         properties.put(PROP_METRIC_URN, this.metricUrn);
       }
@@ -85,18 +86,8 @@ public class AnomalyFilterStageWrapper extends DetectionPipeline {
     }
 
     Collection<MergedAnomalyResultDTO> anomalies =
-        Collections2.filter(candidates, new Predicate<MergedAnomalyResultDTO>() {
-          @Override
-          public boolean apply(@Nullable MergedAnomalyResultDTO mergedAnomaly) {
-            return mergedAnomaly != null && !mergedAnomaly.isChild() && anomalyFilter.isQualified(mergedAnomaly,
-                provider);
-          }
-        });
+        Collections2.filter(candidates, mergedAnomaly -> mergedAnomaly != null && !mergedAnomaly.isChild() && anomalyFilter.isQualified(mergedAnomaly));
 
     return new DetectionPipelineResult(new ArrayList<>(anomalies));
   }
-
-  private AnomalyFilterStage loadAnomalyFilterStage(String className) throws Exception {
-    return (AnomalyFilterStage) Class.forName(className).newInstance();
-  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
similarity index 62%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/BaselineFillingMergeWrapper.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index 1d4860e..7dff42e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -14,19 +14,23 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.algorithm;
+package com.linkedin.thirdeye.detection.wrapper;
 
+import com.google.common.base.Preconditions;
+import com.linkedin.thirdeye.dataframe.Series;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
-import com.linkedin.thirdeye.detection.baseline.BaselineProvider;
-import com.linkedin.thirdeye.detection.baseline.BaselineProviderLoader;
-import com.linkedin.thirdeye.detection.baseline.RuleBaselineProvider;
+import com.linkedin.thirdeye.detection.DetectionUtils;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
+import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
+import com.linkedin.thirdeye.detection.spec.RuleBaselineProviderSpec;
+import com.linkedin.thirdeye.detection.spi.components.BaselineProvider;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
+import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.collections.MapUtils;
@@ -47,20 +51,28 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
 
   private BaselineProvider baselineValueProvider; // optionally configure a baseline value loader
   private BaselineProvider currentValueProvider;
+  private Series.DoubleFunction aggregationFunction;
 
   public BaselineFillingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
-      throws Exception {
+  {
     super(provider, config, startTime, endTime);
+
     if (config.getProperties().containsKey(PROP_BASELINE_PROVIDER)) {
-      this.baselineValueProvider = BaselineProviderLoader.from(
-          MapUtils.getMap(config.getProperties(), PROP_BASELINE_PROVIDER));
+      String referenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_BASELINE_PROVIDER));
+      Preconditions.checkArgument(this.config.getComponents().containsKey(referenceKey));
+      this.baselineValueProvider = (BaselineProvider) this.config.getComponents().get(referenceKey);
     }
     if (config.getProperties().containsKey(PROP_CURRENT_PROVIDER)) {
-      this.currentValueProvider = BaselineProviderLoader.from(MapUtils.getMap(config.getProperties(), PROP_CURRENT_PROVIDER));
+      String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), currentValueProvider));
+      Preconditions.checkArgument(this.config.getComponents().containsKey(detectorReferenceKey));
+      this.currentValueProvider = (BaselineProvider) this.config.getComponents().get(detectorReferenceKey);
     } else {
       // default current provider
       this.currentValueProvider = new RuleBaselineProvider();
-      this.currentValueProvider.init(Collections.<String, Object>singletonMap("offset", "current"));
+      RuleBaselineProviderSpec spec = new RuleBaselineProviderSpec();
+      spec.setOffset("current");
+      InputDataFetcher dataFetcher = new InputDataFetcher(this.provider, this.config.getId());
+      this.currentValueProvider.init(spec, dataFetcher);
     }
     String nestedUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
     if (nestedUrn != null){
@@ -68,6 +80,8 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
         properties.put(PROP_METRIC_URN, nestedUrn);
       }
     }
+
+    this.aggregationFunction = BaselineAggregateType.valueOf(MapUtils.getString(config.getProperties(), "metricFunction", "MEAN")).getFunction();
   }
 
   @Override
@@ -81,35 +95,20 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
    * @return anomalies with current and baseline value filled
    */
   List<MergedAnomalyResultDTO> fillCurrentAndBaselineValue(List<MergedAnomalyResultDTO> mergedAnomalies) {
-    Map<MetricSlice, MergedAnomalyResultDTO> metricSlicesToAnomaly = new HashMap<>();
-
     for (MergedAnomalyResultDTO anomaly : mergedAnomalies) {
       try {
         String metricUrn = anomaly.getMetricUrn();
         final MetricSlice slice = MetricSlice.from(MetricEntity.fromURN(metricUrn).getId(), anomaly.getStartTime(), anomaly.getEndTime(),
             MetricEntity.fromURN(metricUrn).getFilters());
-        metricSlicesToAnomaly.put(slice, anomaly);
+        anomaly.setAvgCurrentVal(this.currentValueProvider.computePredictedAggregates(slice, aggregationFunction));
+        if (this.baselineValueProvider != null) {
+          anomaly.setAvgBaselineVal(this.baselineValueProvider.computePredictedAggregates(slice, aggregationFunction));
+        }
       } catch (Exception e) {
         // ignore
         LOG.warn("cannot get metric slice for anomaly {}", anomaly, e);
       }
     }
-
-    Map<MetricSlice, Double> currentValues = this.currentValueProvider.computeBaselineAggregates(metricSlicesToAnomaly.keySet(), this.provider);
-    Map<MetricSlice, Double> baselineValues = new HashMap<>();
-    if (this.baselineValueProvider != null) {
-      baselineValues = this.baselineValueProvider.computeBaselineAggregates(metricSlicesToAnomaly.keySet(), this.provider);
-    }
-    for (Map.Entry<MetricSlice, MergedAnomalyResultDTO> entry : metricSlicesToAnomaly.entrySet()) {
-      MergedAnomalyResultDTO anomaly = entry.getValue();
-      MetricSlice slice = entry.getKey();
-      if (currentValues.containsKey(slice)){
-        anomaly.setAvgCurrentVal(currentValues.get(slice));
-      }
-      if (baselineValues.containsKey(slice)){
-        anomaly.setAvgBaselineVal(baselineValues.get(slice));
-      }
-    }
     return mergedAnomalies;
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
similarity index 91%
rename from thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/ChildKeepingMergeWrapper.java
rename to thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index ed796dc..77536f9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -14,11 +14,12 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.algorithm;
+package com.linkedin.thirdeye.detection.wrapper;
 
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
+import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -42,17 +43,18 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
   @Override
   protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
     List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
-    Collections.sort(input, COMPARATOR);
+    Collections.sort(input, MergeWrapper.COMPARATOR);
 
     List<MergedAnomalyResultDTO> output = new ArrayList<>();
 
-    Map<AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>();
+    Map<MergeWrapper.AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>();
     for (MergedAnomalyResultDTO anomaly : input) {
       if (anomaly.isChild()) {
         continue;
       }
 
-      AnomalyKey key = new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "");
+      MergeWrapper.AnomalyKey
+          key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "");
       MergedAnomalyResultDTO parent = parents.get(key);
 
       if (parent == null || anomaly.getStartTime() - parent.getEndTime() > this.maxGap) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 4c9957e..e913c60 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -2,25 +2,38 @@ package com.linkedin.thirdeye.detection.yaml;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
+import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.ConfigUtils;
-import com.linkedin.thirdeye.detection.algorithm.BaselineFillingMergeWrapper;
-import com.linkedin.thirdeye.detection.algorithm.ChildKeepingMergeWrapper;
+import com.linkedin.thirdeye.detection.DataProvider;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
 import com.linkedin.thirdeye.detection.algorithm.DimensionWrapper;
-import com.linkedin.thirdeye.detection.algorithm.stage.AnomalyDetectionStageWrapper;
-import com.linkedin.thirdeye.detection.algorithm.stage.AnomalyFilterStageWrapper;
 import com.linkedin.thirdeye.detection.annotation.DetectionRegistry;
+import com.linkedin.thirdeye.detection.annotation.Yaml;
+import com.linkedin.thirdeye.detection.spec.AbstractSpec;
+import com.linkedin.thirdeye.detection.spi.components.Tunable;
+import com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
+import com.linkedin.thirdeye.detection.wrapper.AnomalyFilterWrapper;
+import com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper;
+import com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
+import com.linkedin.thirdeye.detection.DetectionUtils;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.collections.MapUtils;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
 
 import static com.linkedin.thirdeye.detection.ConfigUtils.*;
+import static com.linkedin.thirdeye.detection.DetectionUtils.*;
 
 
 /**
@@ -94,6 +107,7 @@ import static com.linkedin.thirdeye.detection.ConfigUtils.*;
  * +-----------------------------------------+
  *
  */
+@Yaml(pipelineType = "COMPOSITE")
 public class CompositePipelineConfigTranslator extends YamlDetectionConfigTranslator {
   private static final String PROP_DIMENSION_EXPLORATION = "dimensionExploration";
 
@@ -102,33 +116,54 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
   private static final String PROP_FILTERS = "filters";
   private static final String PROP_METRIC = "metric";
   private static final String PROP_DATASET = "dataset";
-  private static final String PROP_STAGE_CLASSNAME = "stageClassName";
   private static final String PROP_TYPE = "type";
   private static final String PROP_CLASS_NAME = "className";
-  private static final String PROP_SPEC = "specs";
+  private static final String PROP_PARAMS = "params";
   private static final String PROP_METRIC_URN = "metricUrn";
-  private static final String PROP_ANOMALY_DETECTION = "anomalyDetection";
+  private static final String PROP_RULES = "rules";
   private static final String PROP_NESTED = "nested";
+  private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
+  private static final String PROP_NAME = "name";
+  private static final String PROP_DETECTOR = "detector";
 
   private static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
+  private static final Map<String, String> DETECTOR_TO_BASELINE = ImmutableMap.of();
+
+  private final Map<String, Object> components = new HashMap<>();
+  private MetricConfigDTO metricConfig;
+  private DatasetConfigDTO datasetConfig;
+
+  public CompositePipelineConfigTranslator(Map<String, Object> yamlConfig, DataProvider provider) {
+    super(yamlConfig, provider);
+  }
 
   @Override
-  Map<String, Object> buildDetectionProperties(Map<String, Object> yamlConfig) {
+  YamlTranslationResult translateYaml() {
+    this.metricConfig = this.dataProvider.fetchMetric(MapUtils.getString(yamlConfig, PROP_METRIC),
+        MapUtils.getString(yamlConfig, PROP_DATASET));
+    Preconditions.checkNotNull(this.metricConfig, "Metric not found");
+
+    this.datasetConfig = this.dataProvider.fetchDatasets(Collections.singletonList(metricConfig.getDataset()))
+        .get(metricConfig.getDataset());
+    Preconditions.checkNotNull(this.datasetConfig, "dataset not found");
+
     String metricUrn = buildMetricUrn(yamlConfig);
+    String cron = buildCron();
 
-    List<Map<String, Object>> detectionYamls = getList(yamlConfig.get(PROP_ANOMALY_DETECTION));
+    List<Map<String, Object>> ruleYamls = getList(yamlConfig.get(PROP_RULES));
     List<Map<String, Object>> nestedPipelines = new ArrayList<>();
-    for (Map<String, Object> detectionYaml : detectionYamls) {
-      List<Map<String, Object>> filterYamls = ConfigUtils.getList(detectionYaml.get(PROP_FILTER));
-      Map<String, Object> detectionProperties = buildWrapperProperties(BaselineFillingMergeWrapper.class.getName(), buildListOfDetectionCoreProperties(
-          ConfigUtils.<Map<String, Object>>getList(detectionYaml.get(PROP_DETECTION))));
+    for (Map<String, Object> ruleYaml : ruleYamls) {
+      String ruleName = MapUtils.getString(ruleYaml, PROP_NAME);
+      List<Map<String, Object>> filterYamls = ConfigUtils.getList(ruleYaml.get(PROP_FILTER));
+      List<Map<String, Object>> detectionYamls = ConfigUtils.getList(ruleYaml.get(PROP_DETECTION));
+      List<Map<String, Object>> detectionProperties = buildListOfMergeWrapperProperties(ruleName, detectionYamls);
       if (filterYamls == null || filterYamls.isEmpty()) {
-        nestedPipelines.add(detectionProperties);
+        nestedPipelines.addAll(detectionProperties);
       } else {
-        List<Map<String, Object>> filterNestedProperties = Collections.singletonList(detectionProperties);
+        List<Map<String, Object>> filterNestedProperties = detectionProperties;
         for (Map<String, Object> filterProperties : filterYamls) {
-          filterNestedProperties = new ArrayList<>(buildStageWrapperProperties(AnomalyFilterStageWrapper.class.getName(),
-              filterProperties, filterNestedProperties));
+          filterNestedProperties = buildFilterWrapperProperties(AnomalyFilterWrapper.class.getName(), filterProperties,
+              filterNestedProperties, ruleName);
         }
         nestedPipelines.addAll(filterNestedProperties);
       }
@@ -136,12 +171,47 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     Map<String, Object> dimensionWrapperProperties = new HashMap<>();
     dimensionWrapperProperties.putAll(MapUtils.getMap(yamlConfig, PROP_DIMENSION_EXPLORATION));
     dimensionWrapperProperties.put(PROP_METRIC_URN, metricUrn);
-    return buildWrapperProperties(ChildKeepingMergeWrapper.class.getName(), Collections.singletonList(
-        buildWrapperProperties(DimensionWrapper.class.getName(), nestedPipelines, dimensionWrapperProperties)));
+    Map<String, Object> properties = buildWrapperProperties(ChildKeepingMergeWrapper.class.getName(),
+        Collections.singletonList(
+            buildWrapperProperties(DimensionWrapper.class.getName(), nestedPipelines, dimensionWrapperProperties)));
+    return new YamlTranslationResult().withProperties(properties).withComponents(this.components).withCron(cron);
+  }
+
+  private List<Map<String, Object>> buildListOfMergeWrapperProperties(String ruleName,
+      List<Map<String, Object>> yamlConfigs) {
+    List<Map<String, Object>> properties = new ArrayList<>();
+    for (Map<String, Object> yamlConfig : yamlConfigs) {
+      properties.add(buildMergeWrapperProperties(ruleName, yamlConfig));
+    }
+    return properties;
   }
 
-  private Collection<Map<String, Object>> buildStageWrapperProperties(String wrapperClassName,
-      Map<String, Object> yamlConfig, List<Map<String, Object>> nestedProperties) {
+  private Map<String, Object> buildMergeWrapperProperties(String ruleName, Map<String, Object> yamlConfig) {
+    String detectorType = MapUtils.getString(yamlConfig, PROP_TYPE);
+    Map<String, Object> nestedProperties = new HashMap<>();
+    nestedProperties.put(PROP_CLASS_NAME, AnomalyDetectorWrapper.class.getName());
+    String detectorKey = makeComponentKey(ruleName, detectorType);
+    nestedProperties.put(PROP_DETECTOR, detectorKey);
+    // TODO insert window size & unit
+
+    buildComponentSpec(yamlConfig, detectorType, detectorKey);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(PROP_CLASS_NAME, BaselineFillingMergeWrapper.class.getName());
+    properties.put(PROP_NESTED, Collections.singletonList(nestedProperties));
+    String baselineProviderType =  "RULE_BASELINE";
+    if (DETECTOR_TO_BASELINE.containsKey(detectorType)) {
+      baselineProviderType = DETECTOR_TO_BASELINE.get(detectorType);
+    }
+    String baselineProviderKey = makeComponentKey(ruleName + "_" + detectorType,  baselineProviderType);
+    properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
+    buildComponentSpec(yamlConfig, baselineProviderType, baselineProviderKey);
+
+    return properties;
+  }
+
+  private List<Map<String, Object>> buildFilterWrapperProperties(String wrapperClassName,
+      Map<String, Object> yamlConfig, List<Map<String, Object>> nestedProperties, String ruleName) {
     if (yamlConfig == null || yamlConfig.isEmpty()) {
       return nestedProperties;
     }
@@ -149,13 +219,17 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     if (wrapperProperties.isEmpty()) {
       return Collections.emptyList();
     }
-    fillStageSpecs(wrapperProperties, yamlConfig);
+    String filterType = MapUtils.getString(yamlConfig, PROP_TYPE);
+    String filterKey = makeComponentKey(ruleName, filterType);
+    wrapperProperties.put(PROP_FILTER, filterKey);
+    buildComponentSpec(yamlConfig, filterType, filterKey);
+
     return Collections.singletonList(wrapperProperties);
   }
 
   private Map<String, Object> buildWrapperProperties(String wrapperClassName,
       List<Map<String, Object>> nestedProperties) {
-    return buildWrapperProperties(wrapperClassName, nestedProperties, Collections.<String, Object>emptyMap());
+    return buildWrapperProperties(wrapperClassName, nestedProperties, Collections.emptyMap());
   }
 
   private Map<String, Object> buildWrapperProperties(String wrapperClassName,
@@ -176,32 +250,17 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     return properties;
   }
 
-  private List<Map<String, Object>> buildListOfDetectionCoreProperties(List<Map<String, Object>> yamlConfigs) {
-    List<Map<String, Object>> properties = new ArrayList<>();
-    for (Map<String, Object> yamlConfig : yamlConfigs) {
-      properties.add(buildDetectionCoreProperties(AnomalyDetectionStageWrapper.class.getName(), yamlConfig));
-    }
-    return properties;
-  }
-
-  private Map<String, Object> buildDetectionCoreProperties(String wrapperClassName, Map<String, Object> yamlConfig) {
-    Map<String, Object> properties = new HashMap<>();
-    properties.put(PROP_CLASS_NAME, wrapperClassName);
-    fillStageSpecs(properties, yamlConfig);
-    return properties;
-  }
-
-  private void fillStageSpecs(Map<String, Object> properties, Map<String, Object> detectionCoreYamlConfigs) {
-    Map<String, Object> specs = new HashMap<>();
-    for (Map.Entry<String, Object> entry : detectionCoreYamlConfigs.entrySet()) {
-      if (entry.getKey().equals(PROP_TYPE)) {
-        properties.put(PROP_STAGE_CLASSNAME,
-            DETECTION_REGISTRY.lookup(MapUtils.getString(detectionCoreYamlConfigs, PROP_TYPE)));
-      } else {
-        specs.put(entry.getKey(), entry.getValue());
-      }
+  private String buildCron() {
+    switch (this.datasetConfig.bucketTimeGranularity().getUnit()) {
+      case MINUTES:
+        return "0 0/15 * * * ? *";
+      case HOURS:
+        return "0 0 * * * ? *";
+      case DAYS:
+        return "0 0 14 * * ? *";
+      default:
+        return "0 0 0 * * ?";
     }
-    properties.put(PROP_SPEC, specs);
   }
 
   private String buildMetricUrn(Map<String, Object> yamlConfig) {
@@ -213,33 +272,84 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
         filters.putAll(entry.getKey(), entry.getValue());
       }
     }
-    MetricConfigDTO metricConfig = this.metricDAO.findByMetricAndDataset(MapUtils.getString(yamlConfig, PROP_METRIC),
-        MapUtils.getString(yamlConfig, PROP_DATASET));
-    Preconditions.checkNotNull(metricConfig, "Metric not found");
 
-    MetricEntity me = MetricEntity.fromMetric(1.0, metricConfig.getId(), filters);
+    MetricEntity me = MetricEntity.fromMetric(1.0, this.metricConfig.getId(), filters);
     return me.getUrn();
   }
 
+  private void buildComponentSpec(Map<String, Object> yamlConfig, String type, String componentKey) {
+    String componentName = DetectionUtils.getComponentName(componentKey);
+    String componentClassName = DETECTION_REGISTRY.lookup(type);
+    Map<String, Object> componentSpecs = new HashMap<>();
+    componentSpecs.put(PROP_CLASS_NAME, componentClassName);
+    Map<String, Object> params = MapUtils.getMap(yamlConfig, PROP_PARAMS);
+
+    if (DETECTION_REGISTRY.isTunable(componentClassName)) {
+      try {
+        componentSpecs.putAll(getTunedSpecs(componentName, componentClassName, params));
+      } catch (Exception e) {
+        LOG.error("Tuning failed for component " + type, e);
+      }
+    } else {
+      componentSpecs.putAll(params);
+    }
+    this.components.put(componentName, componentSpecs);
+  }
+
+  private Map<String, Object> getTunedSpecs(String componentName, String componentClassName, Map<String, Object> params)
+      throws Exception {
+    long configId = this.existingConfig == null ? -1 : this.existingConfig.getId();
+    InputDataFetcher dataFetcher = new InputDataFetcher(this.dataProvider, configId);
+    Tunable tunable = getTunable(componentClassName, params, dataFetcher);
+    Interval window = new Interval(this.startTime, this.endTime, DateTimeZone.forID(this.datasetConfig.getTimezone()));
+    Map<String, Object> existingComponentSpec =
+        this.existingComponentSpecs.containsKey(componentName) ? MapUtils.getMap(this.existingComponentSpecs,
+            componentName) : Collections.emptyMap();
+
+    return tunable.tune(existingComponentSpec, window);
+  }
+
+  private Tunable getTunable(String componentClassName, Map<String, Object> params, InputDataFetcher dataFetcher)
+      throws Exception {
+    String tunableClassName = DETECTION_REGISTRY.lookupTunable(componentClassName);
+    Class clazz = Class.forName(tunableClassName);
+    Class<AbstractSpec> specClazz = (Class<AbstractSpec>) Class.forName(getSpecClassName(clazz));
+    AbstractSpec spec = AbstractSpec.fromProperties(params, specClazz);
+    Tunable tunable = (Tunable) clazz.newInstance();
+    tunable.init(spec, dataFetcher);
+    return tunable;
+  }
+
+  private String makeComponentKey(String name, String type) {
+    return "$" + name + "_" + type;
+  }
+
   @Override
   protected void validateYAML(Map<String, Object> yamlConfig) {
     super.validateYAML(yamlConfig);
     Preconditions.checkArgument(yamlConfig.containsKey(PROP_METRIC), "Property missing " + PROP_METRIC);
     Preconditions.checkArgument(yamlConfig.containsKey(PROP_DATASET), "Property missing " + PROP_DATASET);
-    Preconditions.checkArgument(yamlConfig.containsKey(PROP_ANOMALY_DETECTION), "Property missing " + PROP_ANOMALY_DETECTION);
-
-    List<Map<String, Object>> detectionYamls = getList(yamlConfig.get(PROP_ANOMALY_DETECTION));
-    for (int i = 0; i < detectionYamls.size(); i++) {
-      Map<String, Object> detectionYaml = detectionYamls.get(i);
-      Preconditions.checkArgument(detectionYaml.containsKey(PROP_DETECTION), "In rule No." + (i+1) + ", detection stage property missing. ");
-      List<Map<String, Object>> detectionStageYamls = ConfigUtils.getList(detectionYaml.get(PROP_DETECTION));
+    Preconditions.checkArgument(yamlConfig.containsKey(PROP_RULES), "Property missing " + PROP_RULES);
+    Set<String> names = new HashSet<>();
+    List<Map<String, Object>> ruleYamls = getList(yamlConfig.get(PROP_RULES));
+    for (int i = 0; i < ruleYamls.size(); i++) {
+      Map<String, Object> ruleYaml = ruleYamls.get(i);
+      Preconditions.checkArgument(ruleYaml.containsKey(PROP_NAME), "In rule No." + (i + 1) + ", rule name property missing. ");
+      String name = MapUtils.getString(ruleYaml, PROP_NAME);
+      Preconditions.checkArgument(!names.contains(name), "In rule No." + (i + 1) + ", found duplicated rule name: " + name, ". Rule name must be unique.");
+      names.add(name);
+      Preconditions.checkArgument(ruleYaml.containsKey(PROP_DETECTION),
+          "In rule No." + (i + 1) + ", detection stage property missing. ");
+      List<Map<String, Object>> detectionStageYamls = ConfigUtils.getList(ruleYaml.get(PROP_DETECTION));
       for (Map<String, Object> detectionStageYaml : detectionStageYamls) {
-        Preconditions.checkArgument(detectionStageYaml.containsKey(PROP_TYPE), "In rule No." + (i+1) + ", detection stage type missing. ");
+        Preconditions.checkArgument(detectionStageYaml.containsKey(PROP_TYPE),
+            "In rule No." + (i + 1) + ", detection stage type missing. ");
       }
-      if (detectionYaml.containsKey(PROP_FILTER)) {
-        List<Map<String, Object>> filterStageYamls = ConfigUtils.getList(MapUtils.getMap(detectionYaml, PROP_FILTER));
+      if (ruleYaml.containsKey(PROP_FILTER)) {
+        List<Map<String, Object>> filterStageYamls = ConfigUtils.getList(MapUtils.getMap(ruleYaml, PROP_FILTER));
         for (Map<String, Object> filterStageYaml : filterStageYamls) {
-          Preconditions.checkArgument(filterStageYaml.containsKey(PROP_TYPE), "In rule No." + (i+1) + ", filter stage type missing. ");
+          Preconditions.checkArgument(filterStageYaml.containsKey(PROP_TYPE),
+              "In rule No." + (i + 1) + ", filter stage type missing. ");
         }
       }
     }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
index 9b850c4..be383bc 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
@@ -1,59 +1,76 @@
 package com.linkedin.thirdeye.detection.yaml;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import com.linkedin.thirdeye.datalayer.bao.MetricConfigManager;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
-import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
-import com.linkedin.thirdeye.datasource.DAORegistry;
-import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
-import java.util.Collection;
-import java.util.List;
+import com.linkedin.thirdeye.detection.DataProvider;
+import com.linkedin.thirdeye.detection.DetectionPipeline;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.collections.MapUtils;
-
-import static com.linkedin.thirdeye.detection.ConfigUtils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * The YAML config translator converts the yaml config into a detection config
+ * The YAML config translator converts the yaml config into a detection config.
+ * Calls training module for each stage.
  */
 public abstract class YamlDetectionConfigTranslator {
-  private static final String PROP_NAME = "name";
-  private static final String PROP_CRON = "cron";
+  protected static final Logger LOG = LoggerFactory.getLogger(YamlDetectionConfigTranslator.class);
+  private static final String PROP_NAME = "detectionName";
 
-  private static final String CRON_SCHEDULE_DEFAULT = "0 0 14 * * ? *";
 
-  private static final DAORegistry DAO_REGISTRY = DAORegistry.getInstance();
+  protected Map<String, Object> yamlConfig;
+  protected long startTime;
+  protected long endTime;
+  protected DataProvider dataProvider;
+  protected DetectionConfigDTO existingConfig;
+  protected Map<String, Object> existingComponentSpecs;
 
-  MetricConfigManager metricDAO;
+  public YamlDetectionConfigTranslator(Map<String, Object> yamlConfig, DataProvider provider) {
+    this.yamlConfig = yamlConfig;
+    this.dataProvider = provider;
+    this.existingComponentSpecs = new HashMap<>();
+  }
 
-  public YamlDetectionConfigTranslator() {
-    this.metricDAO = DAO_REGISTRY.getMetricConfigDAO();
+  public YamlDetectionConfigTranslator withTrainingWindow(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    return this;
+  }
+
+  public YamlDetectionConfigTranslator withExistingDetectionConfig(DetectionConfigDTO existingDTO) {
+    this.existingConfig = existingDTO;
+    this.existingComponentSpecs = existingDTO.getComponentSpecs();
+    return this;
   }
 
   /**
    * Convert Yaml configurations into detection properties. Can be customized and override by different detection flow.
-   * @param yamlConfig yaml configuration of a detection pipeline flow type
    * @return properties of the detection pipeline
    */
-  abstract Map<String, Object> buildDetectionProperties(Map<String, Object> yamlConfig);
+  abstract YamlTranslationResult translateYaml();
 
   /**
    * Fill in common fields of detection config. Properties of the pipeline is filled by the subclass.
    */
-  DetectionConfigDTO generateDetectionConfig(Map<String, Object> yamlConfig) {
+  DetectionConfigDTO generateDetectionConfig() {
     validateYAML(yamlConfig);
 
     DetectionConfigDTO config = new DetectionConfigDTO();
     config.setName(MapUtils.getString(yamlConfig, PROP_NAME));
-    config.setCron(MapUtils.getString(yamlConfig, PROP_CRON, CRON_SCHEDULE_DEFAULT));
     config.setLastTimestamp(System.currentTimeMillis());
     config.setActive(true);
-    Map<String, Object> properties = buildDetectionProperties(yamlConfig);
-    Preconditions.checkArgument(!properties.isEmpty(), "Empty detection property");
-    config.setProperties(properties);
+    YamlTranslationResult translationResult = translateYaml();
+    Preconditions.checkArgument(!translationResult.getProperties().isEmpty(), "Empty detection property");
+    config.setProperties(translationResult.getProperties());
+    config.setComponentSpecs(translationResult.getComponents());
+    config.setCron(translationResult.getCron());
+    if (existingConfig != null) {
+      config.setId(existingConfig.getId());
+      config.setLastTimestamp(existingConfig.getLastTimestamp());
+    }
+
     return config;
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionTranslatorLoader.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionTranslatorLoader.java
index 58887f4..0d6b231 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionTranslatorLoader.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionTranslatorLoader.java
@@ -1,7 +1,8 @@
 package com.linkedin.thirdeye.detection.yaml;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.detection.DataProvider;
+import com.linkedin.thirdeye.detection.annotation.DetectionRegistry;
+import java.lang.reflect.Constructor;
 import java.util.Map;
 
 /**
@@ -9,15 +10,12 @@ import java.util.Map;
  */
 public class YamlDetectionTranslatorLoader {
   private static final String PROP_PIPELINE_TYPE= "pipelineType";
+  private static DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
 
-  private static final Map<String, String> PIPELINE_TYPE_REGISTRY = ImmutableMap.<String, String>builder()
-      .put("COMPOSITE", CompositePipelineConfigTranslator.class.getName())
-      .build();
-
-  public YamlDetectionConfigTranslator from(Map<String, Object> yamlConfig) throws Exception {
-    Preconditions.checkArgument(yamlConfig.containsKey(PROP_PIPELINE_TYPE), "pipeline type not found, abort.");
-    String className = this.PIPELINE_TYPE_REGISTRY.get(yamlConfig.get(PROP_PIPELINE_TYPE).toString().toUpperCase());
-    return (YamlDetectionConfigTranslator) Class.forName(className).newInstance();
+  public YamlDetectionConfigTranslator from(Map<String, Object> yamlConfig, DataProvider provider) throws Exception {
+    String className = DETECTION_REGISTRY.lookupYamlConverter(yamlConfig.get(PROP_PIPELINE_TYPE).toString().toUpperCase());
+    Constructor<?> constructor = Class.forName(className).getConstructor(Map.class, DataProvider.class);
+    return (YamlDetectionConfigTranslator) constructor.newInstance(yamlConfig, provider);
   }
 
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
index 4f8bbe1..259714b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
@@ -3,13 +3,25 @@ package com.linkedin.thirdeye.detection.yaml;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.datalayer.bao.DatasetConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
+import com.linkedin.thirdeye.datalayer.bao.EventManager;
+import com.linkedin.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import com.linkedin.thirdeye.datalayer.bao.MetricConfigManager;
 import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.util.Predicate;
 import com.linkedin.thirdeye.datasource.DAORegistry;
+import com.linkedin.thirdeye.datasource.ThirdEyeCacheRegistry;
+import com.linkedin.thirdeye.datasource.loader.AggregationLoader;
+import com.linkedin.thirdeye.datasource.loader.DefaultAggregationLoader;
+import com.linkedin.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
 import com.linkedin.thirdeye.detection.ConfigUtils;
+import com.linkedin.thirdeye.detection.DataProvider;
+import com.linkedin.thirdeye.detection.DefaultDataProvider;
+import com.linkedin.thirdeye.detection.DetectionPipelineLoader;
 import com.wordnik.swagger.annotations.ApiParam;
 import java.util.Collections;
 import java.util.HashMap;
@@ -21,6 +33,7 @@ import javax.ws.rs.Consumes;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.collections.MapUtils;
@@ -29,7 +42,7 @@ import org.yaml.snakeyaml.Yaml;
 
 @Path("/yaml")
 public class YamlResource {
-  private static final String PROP_NAME = "name";
+  private static final String PROP_NAME = "detectionName";
   private static final String PROP_TYPE = "type";
   private static final String PROP_DETECTION_CONFIG_ID = "detectionConfigIds";
 
@@ -39,12 +52,33 @@ public class YamlResource {
   private final DetectionAlertConfigManager detectionAlertConfigDAO;
   private final YamlDetectionTranslatorLoader translatorLoader;
   private final YamlDetectionAlertConfigTranslator alertConfigTranslator;
+  private final DataProvider provider;
+  private final MetricConfigManager metricDAO;
+  private final DatasetConfigManager datasetDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final DetectionPipelineLoader loader;
 
   public YamlResource() {
     this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
     this.detectionAlertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager();
     this.translatorLoader = new YamlDetectionTranslatorLoader();
     this.alertConfigTranslator = new YamlDetectionAlertConfigTranslator();
+    this.metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricDAO, datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricDAO, datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricDAO, datasetDAO, eventDAO, anomalyDAO, timeseriesLoader, aggregationLoader, loader);
   }
 
   /**
@@ -55,28 +89,28 @@ public class YamlResource {
   @POST
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.TEXT_PLAIN)
-  public Response setUpDetectionPipeline(@ApiParam("payload") String payload) throws Exception {
+  public Response setUpDetectionPipeline(@ApiParam("payload") String payload,
+      @QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime) throws Exception {
     if (Strings.isNullOrEmpty(payload)) {
       throw new IllegalArgumentException("Empty Payload");
     }
     Map<String, Object> yamlConfig = (Map<String, Object>) this.YAML_READER.load(payload);
 
-    DetectionConfigDTO detectionConfig;
-    // translate yaml to detection config
-    try {
-      YamlDetectionConfigTranslator translator = translatorLoader.from(yamlConfig);
-      detectionConfig = translator.generateDetectionConfig(yamlConfig);
-    } catch (Exception e) {
-      return Response.status(400).entity(ImmutableMap.of("status", 400, "message", e.getMessage())).build();
-    }
-
+    Preconditions.checkArgument(yamlConfig.containsKey(PROP_NAME), "missing " + PROP_NAME);
     // retrieve id if detection config already exists
     List<DetectionConfigDTO> detectionConfigDTOs =
         this.detectionConfigDAO.findByPredicate(Predicate.EQ("name", MapUtils.getString(yamlConfig, PROP_NAME)));
+    DetectionConfigDTO existingDetectionConfig = null;
     if (!detectionConfigDTOs.isEmpty()) {
-      DetectionConfigDTO existingDetectionConfig = detectionConfigDTOs.get(0);
-      detectionConfig.setId(detectionConfigDTOs.get(0).getId());
-      detectionConfig.setLastTimestamp(existingDetectionConfig.getLastTimestamp());
+      existingDetectionConfig = detectionConfigDTOs.get(0);
+    }
+
+    YamlDetectionConfigTranslator translator = this.translatorLoader.from(yamlConfig, this.provider);
+    DetectionConfigDTO detectionConfig;
+    try{
+      detectionConfig = translator.withTrainingWindow(startTime, endTime).withExistingDetectionConfig(existingDetectionConfig).generateDetectionConfig();
+    } catch (Exception e) {
+      return Response.status(400).entity(ImmutableMap.of("status", "400", "message", e.getMessage())).build();
     }
     detectionConfig.setYaml(payload);
     Long detectionConfigId = this.detectionConfigDAO.save(detectionConfig);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlTranslationResult.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlTranslationResult.java
new file mode 100644
index 0000000..5424a4f
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlTranslationResult.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.yaml;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+
+public class YamlTranslationResult {
+  private static final String CRON_SCHEDULE_DEFAULT = "0 0 14 * * ? *";
+
+  private final Map<String, Object> properties;
+  private final Map<String, Object> components;
+  private final String cron;
+
+  public YamlTranslationResult() {
+    this.properties = new HashMap<>();
+    this.components = new HashMap<>();
+    this.cron = CRON_SCHEDULE_DEFAULT;
+  }
+
+  public YamlTranslationResult(Map<String, Object> properties, Map<String, Object> components, String cron) {
+    this.properties = properties;
+    this.components = components;
+    this.cron = cron;
+  }
+
+  public Map<String, Object> getProperties() {
+    return properties;
+  }
+
+  public Map<String, Object> getComponents() {
+    return components;
+  }
+
+  public String getCron() {
+    return cron;
+  }
+
+  YamlTranslationResult withProperties(Map<String, Object> properties) {
+    return new YamlTranslationResult(properties, this.components, this.cron);
+  }
+
+  YamlTranslationResult withComponents(Map<String, Object> components) {
+    return new YamlTranslationResult(this.properties, components, this.cron);
+  }
+
+  YamlTranslationResult withCron(String cron) {
+    return new YamlTranslationResult(this.properties, components, cron);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof YamlTranslationResult)) {
+      return false;
+    }
+    YamlTranslationResult that = (YamlTranslationResult) o;
+    return Objects.equals(properties, that.properties) && Objects.equals(components, that.components) && Objects.equals(
+        cron, that.cron);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(properties, components, cron);
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/rootcause/impl/MetricEntity.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/rootcause/impl/MetricEntity.java
index 9f4735f..3423ce5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/rootcause/impl/MetricEntity.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/rootcause/impl/MetricEntity.java
@@ -23,7 +23,6 @@ import com.linkedin.thirdeye.rootcause.Entity;
 import com.linkedin.thirdeye.rootcause.util.EntityUtils;
 import com.linkedin.thirdeye.rootcause.util.ParsedUrn;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
diff --git a/thirdeye/thirdeye-pinot/src/main/resources/com/linkedin/thirdeye/dashboard/views/admin/thirdeye-admin.ftl b/thirdeye/thirdeye-pinot/src/main/resources/com/linkedin/thirdeye/dashboard/views/admin/thirdeye-admin.ftl
index 0e646e1..13a4419 100644
--- a/thirdeye/thirdeye-pinot/src/main/resources/com/linkedin/thirdeye/dashboard/views/admin/thirdeye-admin.ftl
+++ b/thirdeye/thirdeye-pinot/src/main/resources/com/linkedin/thirdeye/dashboard/views/admin/thirdeye-admin.ftl
@@ -48,10 +48,10 @@
         .done(function() {
           var metric_config_template = $("#metric-config-template").html();
           metric_config_template_compiled = Handlebars.compile(metric_config_template);
-      
+
           var job_info_template = $("#job-info-template").html();
           job_info_template_compiled = Handlebars.compile(job_info_template);
-      
+
           //register callbacks on tabs
           $('a[data-toggle="tab"]').on('shown.bs.tab', function(e) {
             e.target // newly activated tab
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DataProviderTest.java
index 3cebe9e..7f1934f 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DataProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DataProviderTest.java
@@ -37,6 +37,8 @@ import com.linkedin.thirdeye.datasource.cache.QueryCache;
 import com.linkedin.thirdeye.datasource.csv.CSVThirdEyeDataSource;
 import com.linkedin.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
 import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.ArrayList;
@@ -277,14 +279,14 @@ public class DataProviderTest {
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void testAnomalyInvalid() {
-    this.provider.fetchAnomalies(Collections.singleton(new AnomalySlice()));
+    this.provider.fetchAnomalies(Collections.singleton(new AnomalySlice()), -1);
   }
 
   @Test
   public void testAnomalySingle() {
-    AnomalySlice slice = makeAnomalySlice(-1, 1209000000L, -1, Collections.<String>emptyList());
+    AnomalySlice slice = makeAnomalySlice(1209000000L, -1, Collections.<String>emptyList());
 
-    Collection<MergedAnomalyResultDTO> anomalies = this.provider.fetchAnomalies(Collections.singleton(slice)).get(slice);
+    Collection<MergedAnomalyResultDTO> anomalies = this.provider.fetchAnomalies(Collections.singleton(slice), -1).get(slice);
 
     Assert.assertEquals(anomalies.size(), 1);
     Assert.assertTrue(anomalies.contains(makeAnomaly(this.anomalyIds.get(2), 200L, 604800000L, 1209600000L, Collections.<String>emptyList())));
@@ -292,9 +294,9 @@ public class DataProviderTest {
 
   @Test
   public void testAnomalyDimension() {
-    AnomalySlice slice = makeAnomalySlice(-1, 0, -1, Arrays.asList("a=1", "c=3"));
+    AnomalySlice slice = makeAnomalySlice(0, -1, Arrays.asList("a=1", "c=3"));
 
-    Collection<MergedAnomalyResultDTO> anomalies = this.provider.fetchAnomalies(Collections.singleton(slice)).get(slice);
+    Collection<MergedAnomalyResultDTO> anomalies = this.provider.fetchAnomalies(Collections.singleton(slice), -1).get(slice);
 
     Assert.assertEquals(anomalies.size(), 3);
     Assert.assertTrue(anomalies.contains(makeAnomaly(this.anomalyIds.get(0), 100L, 4000000L, 8000000L, Arrays.asList("a=1", "c=3", "b=2"))));
@@ -361,13 +363,13 @@ public class DataProviderTest {
     return new EventSlice(start, end, filters);
   }
 
-  private static AnomalySlice makeAnomalySlice(long configId, long start, long end, Iterable<String> filterStrings) {
+  private static AnomalySlice makeAnomalySlice(long start, long end, Iterable<String> filterStrings) {
     SetMultimap<String, String> filters = HashMultimap.create();
     for (String fs : filterStrings) {
       String[] parts = fs.split("=");
       filters.put(parts[0], parts[1]);
     }
-    return new AnomalySlice(configId, start, end, filters);
+    return new AnomalySlice(start, end, filters);
   }
 
   private static MetricConfigDTO makeMetric(Long id, String metric, String dataset) {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
index 9a47c82..084e4dc 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
@@ -28,6 +28,8 @@ import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.EventDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -122,11 +124,14 @@ public class MockDataProvider implements DataProvider {
   }
 
   @Override
-  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices) {
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
     Multimap<AnomalySlice, MergedAnomalyResultDTO> result = ArrayListMultimap.create();
     for (AnomalySlice slice : slices) {
       for (MergedAnomalyResultDTO anomaly : this.anomalies) {
         if (slice.match(anomaly)) {
+          if (configId >= 0 && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != configId)){
+            continue;
+          }
           result.put(slice, anomaly);
         }
       }
@@ -174,6 +179,16 @@ public class MockDataProvider implements DataProvider {
   }
 
   @Override
+  public MetricConfigDTO fetchMetric(String metricName, String datasetName) {
+      for (MetricConfigDTO metric : this.metrics) {
+        if (metricName.equals(metric.getName()) && datasetName.equals(metric.getDataset())) {
+          return metric;
+        }
+      }
+      return null;
+  }
+
+  @Override
   public DetectionPipeline loadPipeline(DetectionConfigDTO config, long start, long end) throws Exception {
     return this.loader.from(this, config, start, end);
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/BaselineAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/BaselineAlgorithmTest.java
index b544238..d187bd2 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/BaselineAlgorithmTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/BaselineAlgorithmTest.java
@@ -24,7 +24,6 @@ import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
 import com.linkedin.thirdeye.detection.MockDataProvider;
-import com.linkedin.thirdeye.detection.StaticDetectionPipeline;
 import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
 import java.io.InputStreamReader;
 import java.io.Reader;
@@ -77,7 +76,7 @@ public class BaselineAlgorithmTest {
 
     this.config = new DetectionConfigDTO();
     this.config.setProperties(properties);
-
+    this.config.setId(-1L);
     this.provider = new MockDataProvider()
         .setTimeseries(timeseries)
         .setMetrics(Collections.singletonList(metricConfigDTO));
@@ -101,7 +100,7 @@ public class BaselineAlgorithmTest {
   public void testWeekOverWeekChange() throws Exception {
     this.properties.put(PROP_CHANGE, 0.4);
     this.algorithm = new BaselineAlgorithm(this.provider, this.config, 1814400000L, 2419200000L);
-
+    this.config.setId(-1L);
     DetectionPipelineResult result = this.algorithm.run();
 
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
@@ -118,8 +117,8 @@ public class BaselineAlgorithmTest {
     this.properties.put(PROP_WEEKS, 3);
     this.properties.put(PROP_AGGREGATION, BaselineAggregateType.MEDIAN.toString());
     this.properties.put(PROP_CHANGE, 0.3);
+    this.config.setId(-1L);
     this.algorithm = new BaselineAlgorithm(this.provider, this.config, 1814400000L, 2419200000L);
-
     DetectionPipelineResult result = this.algorithm.run();
 
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithmTest.java
index e7df77e..a5f24b4 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithmTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithmTest.java
@@ -35,7 +35,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
-import static org.testng.Assert.*;
 
 
 public class LegacyAnomalyFunctionAlgorithmTest {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/MovingWindowAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/MovingWindowAlgorithmTest.java
index 9ec2ccc..75f7d51 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/MovingWindowAlgorithmTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/MovingWindowAlgorithmTest.java
@@ -103,6 +103,7 @@ public class MovingWindowAlgorithmTest {
 
     this.config = new DetectionConfigDTO();
     this.config.setProperties(properties);
+    this.config.setId(-1L);
 
     this.anomalies = new ArrayList<>();
 
@@ -390,7 +391,7 @@ public class MovingWindowAlgorithmTest {
   //
 
   private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
-    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(null, start, end, METRIC_NAME, DATASET_NAME, Collections.<String, String>emptyMap());
+    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L, start, end, METRIC_NAME, DATASET_NAME, Collections.<String, String>emptyMap());
     anomaly.setMetricUrn("thirdeye:metric:1");
     return anomaly;
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleDetectionStageTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleDetectionStageTest.java
index aab8bd1..12cb7c4 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleDetectionStageTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/stage/BaselineRuleDetectionStageTest.java
@@ -26,7 +26,6 @@ import com.linkedin.thirdeye.detection.DetectionPipeline;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
 import com.linkedin.thirdeye.detection.MockDataProvider;
 import com.linkedin.thirdeye.detection.algorithm.AlgorithmUtils;
-import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.Collections;
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/baseline/MockBaselineProvider.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/baseline/MockBaselineProvider.java
deleted file mode 100644
index 04a7f44..0000000
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/baseline/MockBaselineProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.linkedin.thirdeye.detection.baseline;
-
-import com.linkedin.thirdeye.dataframe.DataFrame;
-import com.linkedin.thirdeye.dataframe.util.MetricSlice;
-import com.linkedin.thirdeye.detection.DataProvider;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-public class MockBaselineProvider implements BaselineProvider {
-  @Override
-  public void init(Map<String, Object> properties) {
-
-  }
-
-  @Override
-  public Map<MetricSlice, DataFrame> computeBaselineTimeSeries(Collection<MetricSlice> slices, DataProvider provider) {
-    return provider.fetchTimeseries(slices);
-  }
-
-  @Override
-  public Map<MetricSlice, Double> computeBaselineAggregates(Collection<MetricSlice> slices, DataProvider provider) {
-    Map<MetricSlice, DataFrame> data = provider.fetchAggregates(slices, Collections.EMPTY_LIST);
-    Map<MetricSlice, Double> result = new HashMap<>();
-    for (MetricSlice slice : slices) {
-      result.put(slice, data.get(slice).getDouble(COL_VALUE, 0));
-    }
-    return result;
-  }
-}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/MockBaselineProvider.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/MockBaselineProvider.java
new file mode 100644
index 0000000..82ef50e
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/MockBaselineProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.components;
+
+import com.linkedin.thirdeye.dataframe.Series;
+import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.spec.MockBaselineProviderSpec;
+import com.linkedin.thirdeye.detection.spi.components.BaselineProvider;
+import com.linkedin.thirdeye.detection.spi.model.TimeSeries;
+
+
+public class MockBaselineProvider implements BaselineProvider<MockBaselineProviderSpec> {
+  private MockBaselineProviderSpec mockSpec;
+
+  @Override
+  public void init(MockBaselineProviderSpec spec, InputDataFetcher dataFetcher) {
+    this.mockSpec = spec;
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    return this.mockSpec.getBaselineTimeseries().get(slice);
+  }
+
+  @Override
+  public Double computePredictedAggregates(MetricSlice slice, Series.DoubleFunction aggregateFunction) {
+    return this.mockSpec.getAggregates().get(slice);
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/baseline/RuleBaselineProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/RuleBaselineProviderTest.java
similarity index 78%
rename from thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/baseline/RuleBaselineProviderTest.java
rename to thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/RuleBaselineProviderTest.java
index 0b7260c..66dec05 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/baseline/RuleBaselineProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/RuleBaselineProviderTest.java
@@ -14,11 +14,14 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.baseline;
+package com.linkedin.thirdeye.detection.components;
 
 import com.linkedin.thirdeye.dataframe.DataFrame;
+import com.linkedin.thirdeye.dataframe.DoubleSeries;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
 import com.linkedin.thirdeye.detection.MockDataProvider;
+import com.linkedin.thirdeye.detection.spec.RuleBaselineProviderSpec;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,7 +33,7 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
 
 
 public class RuleBaselineProviderTest {
-  BaselineProvider baselineProvider;
+  RuleBaselineProvider baselineProvider;
   MockDataProvider dataProvider;
   MetricSlice slice1;
   MetricSlice slice2;
@@ -38,11 +41,8 @@ public class RuleBaselineProviderTest {
   @BeforeMethod
   public void setUp() {
     baselineProvider = new RuleBaselineProvider();
-    Map<String, Object> properties = new HashMap<>();
-    properties.put("offset", "wo1w");
     slice1 = MetricSlice.from(1L, 1538520728000L, 1538607128000L);
     slice2 = MetricSlice.from(1L, 1538524800000L, 1538611200000L);
-    baselineProvider.init(properties);
     dataProvider = new MockDataProvider();
     MetricSlice slice1Wow = MetricSlice.from(1L, 1537915928000L, 1538002328000L);
     MetricSlice slice2Wow = MetricSlice.from(1L, 1537920000000L, 1538006400000L);
@@ -54,6 +54,9 @@ public class RuleBaselineProviderTest {
     aggregates.put(slice2Wow, DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE")
         .build()
         .setIndex(COL_TIME));
+    InputDataFetcher dataFetcher = new InputDataFetcher(dataProvider, -1);
+
+    baselineProvider.init(new RuleBaselineProviderSpec("UTC", "wo1w"), dataFetcher);
 
     dataProvider.setTimeseries(Collections.singletonMap(slice1Wow,
         DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE")
@@ -65,7 +68,7 @@ public class RuleBaselineProviderTest {
 
   @Test
   public void testFetchBaselineTimeSeries() {
-    DataFrame df = baselineProvider.computeBaselineTimeSeries(Collections.singleton(slice1), dataProvider).get(slice1);
+    DataFrame df = baselineProvider.computePredictedTimeSeries(slice1).getDataFrame();
     Assert.assertEquals(df.getDoubles(COL_VALUE).get(0), 100.0);
     Assert.assertEquals(df.getDoubles(COL_VALUE).get(1), 200.0);
   }
@@ -74,13 +77,12 @@ public class RuleBaselineProviderTest {
   @Test
   public void testFetchBaselineAggregates() {
     Assert.assertEquals(
-        this.baselineProvider.computeBaselineAggregates(Collections.singleton(slice1), dataProvider).get(slice1), 100.0);
+        this.baselineProvider.computePredictedAggregates(slice1, DoubleSeries.MEAN), 100.0);
   }
 
   @Test
   public void testFetchBaselineAggregatesNaN() {
     Assert.assertEquals(
-        this.baselineProvider.computeBaselineAggregates(Collections.singleton(slice2), dataProvider).get(slice2), Double.NaN);
+        this.baselineProvider.computePredictedAggregates(slice2, DoubleSeries.MEAN), Double.NaN);
   }
-
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
new file mode 100644
index 0000000..eec07af
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.components;
+
+import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.dataframe.DataFrame;
+import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import com.linkedin.thirdeye.detection.DataProvider;
+import com.linkedin.thirdeye.detection.DetectionPipelineResult;
+import com.linkedin.thirdeye.detection.DetectionTestUtils;
+import com.linkedin.thirdeye.detection.MockDataProvider;
+import com.linkedin.thirdeye.detection.MockPipeline;
+import com.linkedin.thirdeye.detection.MockPipelineLoader;
+import com.linkedin.thirdeye.detection.MockPipelineOutput;
+import com.linkedin.thirdeye.detection.wrapper.AnomalyFilterWrapper;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+public class ThresholdRuleAnomalyFilterTest {
+  private static final String METRIC_URN = "thirdeye:metric:123";
+
+  private List<MergedAnomalyResultDTO> anomalies;
+  private MockPipelineLoader loader;
+  private List<MockPipeline> runs;
+  private DataProvider testDataProvider;
+  private AnomalyFilterWrapper thresholdRuleFilter;
+  private Map<String, Object> properties;
+  private DetectionConfigDTO config;
+  private Map<String, Object> specs;
+
+  @BeforeMethod
+  public void beforeMethod() {
+    Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
+    aggregates.put(MetricSlice.from(123L, 0, 2),
+        new DataFrame().addSeries(COL_VALUE, 0));
+    aggregates.put(MetricSlice.from(123L, 4, 6),
+        new DataFrame().addSeries(COL_VALUE, 200));
+    aggregates.put(MetricSlice.from(123L, 6, 8),
+        new DataFrame().addSeries(COL_VALUE, 500));
+    aggregates.put(MetricSlice.from(123L, 8, 10),
+        new DataFrame().addSeries(COL_VALUE, 1000));
+
+    MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
+    metricConfigDTO.setId(123L);
+    metricConfigDTO.setName("thirdeye-test");
+    metricConfigDTO.setDataset("thirdeye-test-dataset");
+
+    DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO();
+    datasetConfigDTO.setId(124L);
+    datasetConfigDTO.setDataset("thirdeye-test-dataset");
+    datasetConfigDTO.setTimeDuration(2);
+    datasetConfigDTO.setTimeUnit(TimeUnit.MILLISECONDS);
+    datasetConfigDTO.setTimezone("UTC");
+
+    this.config = new DetectionConfigDTO();
+    this.config.setId(125L);
+    this.properties = new HashMap<>();
+    this.properties.put("nested", Collections.singletonList(Collections.singletonMap("className", "dummy")));
+    this.properties.put("filter", "$abc");
+
+    this.specs = new HashMap<>();
+    this.specs.put("className", ThresholdRuleAnomalyFilter.class.getName());
+    this.config.setComponentSpecs(ImmutableMap.of("abc", this.specs));
+    this.config.setProperties(this.properties);
+
+    this.anomalies = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6), makeAnomaly(6, 8), makeAnomaly(8, 10));
+
+    this.runs = new ArrayList<>();
+
+    this.loader = new MockPipelineLoader(this.runs, Collections.singletonList(
+        new MockPipelineOutput(this.anomalies, 10)));
+
+    this.testDataProvider = new MockDataProvider()
+        .setLoader(this.loader)
+        .setMetrics(Collections.singletonList(metricConfigDTO))
+        .setDatasets(Collections.singletonList(datasetConfigDTO))
+        .setAggregates(aggregates);
+  }
+
+  @Test(priority = 0)
+  public void testThresholdRuleFilterNone() throws Exception {
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+
+    DetectionPipelineResult result = this.thresholdRuleFilter.run();
+    List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
+    Assert.assertEquals(result.getLastTimestamp(), 10);
+    Assert.assertEquals(anomalies.size(), 4);
+    Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
+    Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
+    Assert.assertEquals(anomalies.get(2), this.anomalies.get(2));
+    Assert.assertEquals(anomalies.get(3), this.anomalies.get(3));
+  }
+
+  @Test(priority = 1)
+  public void testThresholdRuleFilterMin() throws Exception {
+    this.specs.put("min", 200);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+
+    DetectionPipelineResult result = this.thresholdRuleFilter.run();
+    List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
+    Assert.assertEquals(result.getLastTimestamp(), 10);
+    Assert.assertEquals(anomalies.size(), 3);
+    Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
+    Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
+    Assert.assertEquals(anomalies.get(2), this.anomalies.get(3));
+  }
+
+  @Test(priority = 2)
+  public void testThresholdRuleFilterMax() throws Exception {
+    this.specs.put("max", 500);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+
+    DetectionPipelineResult result = this.thresholdRuleFilter.run();
+    List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
+    Assert.assertEquals(result.getLastTimestamp(), 8);
+    Assert.assertEquals(anomalies.size(), 3);
+    Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
+    Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
+    Assert.assertEquals(anomalies.get(2), this.anomalies.get(2));
+  }
+
+  @Test(priority = 3)
+  public void testThresholdRuleFilterBoth() throws Exception {
+    this.specs.put("min", 200);
+    this.specs.put("max", 500);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+
+    DetectionPipelineResult result = this.thresholdRuleFilter.run();
+    List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
+    Assert.assertEquals(result.getLastTimestamp(), 8);
+    Assert.assertEquals(anomalies.size(), 2);
+    Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
+    Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
+  }
+
+  private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
+    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(125L, start, end);
+    anomaly.setMetricUrn(METRIC_URN);
+    return anomaly;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/ThresholdRuleDetectorTest.java
similarity index 68%
copy from thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithmTest.java
copy to thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/ThresholdRuleDetectorTest.java
index e7df77e..1ab29f5 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/LegacyAnomalyFunctionAlgorithmTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/ThresholdRuleDetectorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.algorithm;
+package com.linkedin.thirdeye.detection.components;
 
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
@@ -23,8 +23,10 @@ import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
+import com.linkedin.thirdeye.detection.DetectionPipeline;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
 import com.linkedin.thirdeye.detection.MockDataProvider;
+import com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -35,18 +37,17 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
-import static org.testng.Assert.*;
 
 
-public class LegacyAnomalyFunctionAlgorithmTest {
+public class ThresholdRuleDetectorTest {
   private DataProvider testDataProvider;
-  private LegacyAnomalyFunctionAlgorithm anomalyFunctionAlgorithm;
+  private DetectionPipeline detectionPipeline;
 
   @BeforeMethod
   public void beforeMethod() throws Exception {
     Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
     timeSeries.put(MetricSlice.from(123L, 0, 10),
-        new DataFrame().addSeries(COL_VALUE, 0, 50, 100, 200, 500, 1000).addSeries(COL_TIME, 0, 1, 2, 4, 6, 8));
+        new DataFrame().addSeries(COL_VALUE, 0, 100, 200, 500, 1000).addSeries(COL_TIME, 0, 2, 4, 6, 8));
 
     MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
     metricConfigDTO.setId(123L);
@@ -62,38 +63,35 @@ public class LegacyAnomalyFunctionAlgorithmTest {
 
     DetectionConfigDTO detectionConfigDTO = new DetectionConfigDTO();
     detectionConfigDTO.setId(125L);
+    Map<String, Object> detectorSpecs = new HashMap<>();
+    detectorSpecs.put("min", 100);
+    detectorSpecs.put("max", 500);
+    detectorSpecs.put("className", ThresholdRuleDetector.class.getName());
     Map<String, Object> properties = new HashMap<>();
-    Map<String, Object> specs = new HashMap<>();
-    specs.put("properties", "min=100;max=500");
-    specs.put("metric", "thirdeye-test");
-    specs.put("bucketSize", 1);
-    specs.put("bucketUnit", "MILLISECONDS");
-    properties.put("specs", specs);
     properties.put("metricUrn", "thirdeye:metric:123");
-    properties.put("anomalyFunctionClassName", "com.linkedin.thirdeye.anomalydetection.function.MinMaxThresholdFunction");
+    properties.put("detector", "$threshold");
     detectionConfigDTO.setProperties(properties);
+    Map<String, Object> componentSpecs = new HashMap<>();
+    componentSpecs.put("threshold", detectorSpecs);
+    detectionConfigDTO.setComponentSpecs(componentSpecs);
 
     this.testDataProvider = new MockDataProvider()
         .setMetrics(Collections.singletonList(metricConfigDTO))
         .setDatasets(Collections.singletonList(datasetConfigDTO))
         .setTimeseries(timeSeries);
-
-    this.anomalyFunctionAlgorithm = new LegacyAnomalyFunctionAlgorithm(this.testDataProvider, detectionConfigDTO, 0, 10);
-
+    this.detectionPipeline = new AnomalyDetectorWrapper(this.testDataProvider, detectionConfigDTO, 0, 10);
   }
 
   @Test
-  public void testRun() throws Exception {
-    DetectionPipelineResult result = this.anomalyFunctionAlgorithm.run();
+  public void testThresholdAlgorithmRun() throws Exception {
+    DetectionPipelineResult result = this.detectionPipeline.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
-    Assert.assertEquals(result.getLastTimestamp(), 9);
+    Assert.assertEquals(result.getLastTimestamp(), 10);
     Assert.assertEquals(anomalies.size(), 2);
-    Assert.assertEquals(anomalies.get(0).getStartTime(), 1);
+    Assert.assertEquals(anomalies.get(0).getStartTime(), 0);
     Assert.assertEquals(anomalies.get(0).getEndTime(), 2);
     Assert.assertEquals(anomalies.get(1).getStartTime(), 8);
-    Assert.assertEquals(anomalies.get(1).getEndTime(), 9);
-    Assert.assertEquals(result.getAnomalies().get(0).getDetectionConfigId(), (Long) 125L);
-    Assert.assertNull(result.getAnomalies().get(0).getFunctionId());
-    Assert.assertNull(result.getAnomalies().get(0).getFunction());
+    Assert.assertEquals(anomalies.get(1).getEndTime(), 10);
   }
-}
+
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/finetune/GridSearchTuningAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/finetune/GridSearchTuningAlgorithmTest.java
index e75f904..faa7f60 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/finetune/GridSearchTuningAlgorithmTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/finetune/GridSearchTuningAlgorithmTest.java
@@ -35,7 +35,7 @@ import com.linkedin.thirdeye.datasource.ThirdEyeCacheRegistry;
 import com.linkedin.thirdeye.datasource.ThirdEyeDataSource;
 import com.linkedin.thirdeye.datasource.cache.QueryCache;
 import com.linkedin.thirdeye.datasource.csv.CSVThirdEyeDataSource;
-import com.linkedin.thirdeye.detection.AnomalySlice;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -125,7 +125,7 @@ public class GridSearchTuningAlgorithmTest {
   @Test
   public void testGridSearch() throws Exception {
     AnomalySlice slice = new AnomalySlice().withStart(1525211842000L).withEnd(1527890242000L);
-    gridSearch.fit(slice);
+    gridSearch.fit(slice, -1);
     DetectionConfigDTO config = gridSearch.bestDetectionConfig();
     Assert.assertEquals(MapUtils.getDouble(config.getProperties(), "change"), 0.05);
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
index c34c0de..a04b613 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
@@ -17,14 +17,12 @@
 package com.linkedin.thirdeye.detection.integration;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.HashMultimap;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
-import com.linkedin.thirdeye.detection.AnomalySlice;
 import com.linkedin.thirdeye.detection.DetectionPipeline;
 import com.linkedin.thirdeye.detection.DetectionPipelineLoader;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
@@ -35,7 +33,6 @@ import java.io.InputStreamReader;
 import java.io.Reader;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -124,6 +121,7 @@ public class MergeDimensionThresholdIntegrationTest {
 
     this.config = new DetectionConfigDTO();
     this.config.setProperties(this.properties);
+    this.config.setId(-1L);
   }
 
   @Test
@@ -147,7 +145,7 @@ public class MergeDimensionThresholdIntegrationTest {
       dimensions.put(entry.getKey(), entry.getValue());
     }
 
-    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(null, start, end, METRIC, DATASET, dimensions);
+    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L, start, end, METRIC, DATASET, dimensions);
     anomaly.setMetricUrn(metricUrn);
     return anomaly;
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/AbstractSpecTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/AbstractSpecTest.java
new file mode 100644
index 0000000..70d3aa9
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/AbstractSpecTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.spec;
+
+import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class AbstractSpecTest {
+  @Test
+  public void testAbstractSpecMapping() {
+    TestSpec spec = AbstractSpec.fromProperties(ImmutableMap.of(), TestSpec.class);
+    Assert.assertEquals(spec.getA(), 123);
+    Assert.assertEquals(spec.getB(), 456.7);
+    Assert.assertEquals(spec.getC(), "default");
+  }
+
+  @Test
+  public void testAbstractSpecMapping1() {
+    TestSpec spec = AbstractSpec.fromProperties(ImmutableMap.of("a", 321), TestSpec.class);
+    Assert.assertEquals(spec.getA(), 321);
+    Assert.assertEquals(spec.getB(), 456.7);
+    Assert.assertEquals(spec.getC(), "default");
+  }
+
+  @Test
+  public void testAbstractSpecMapping2() {
+    RuleBaselineProvider provider = new RuleBaselineProvider();
+    TestSpec spec = AbstractSpec.fromProperties(ImmutableMap.of("baselineProvider", provider), TestSpec.class);
+    Assert.assertEquals(spec.getA(), 123);
+    Assert.assertEquals(spec.getBaselineProvider(), provider);
+    Assert.assertEquals(spec.getB(), 456.7);
+    Assert.assertEquals(spec.getC(), "default");
+  }
+
+  @Test
+  public void testAbstractSpecMapping3() {
+    TestSpec spec = AbstractSpec.fromProperties(ImmutableMap.of("a", 321, "className", "org.test.Test"), TestSpec.class);
+    Assert.assertEquals(spec.getA(), 321);
+    Assert.assertEquals(spec.getB(), 456.7);
+    Assert.assertEquals(spec.getC(), "default");
+  }
+
+}
+
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/MockBaselineProviderSpec.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/MockBaselineProviderSpec.java
new file mode 100644
index 0000000..4f179b0
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/MockBaselineProviderSpec.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.spec;
+
+import com.linkedin.thirdeye.dataframe.util.MetricSlice;
+import com.linkedin.thirdeye.detection.spi.model.TimeSeries;
+import java.util.Map;
+
+
+public class MockBaselineProviderSpec extends AbstractSpec {
+  private Map<MetricSlice, TimeSeries> baselineTimeseries;
+  private Map<MetricSlice, Double> aggregates;
+
+  public Map<MetricSlice, TimeSeries> getBaselineTimeseries() {
+    return baselineTimeseries;
+  }
+
+  public void setBaselineTimeseries(Map<MetricSlice, TimeSeries> baselineTimeseries) {
+    this.baselineTimeseries = baselineTimeseries;
+  }
+
+  public Map<MetricSlice, Double> getAggregates() {
+    return aggregates;
+  }
+
+  public void setAggregates(Map<MetricSlice, Double> aggregates) {
+    this.aggregates = aggregates;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/TestSpec.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/TestSpec.java
new file mode 100644
index 0000000..f6ed4b7
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/spec/TestSpec.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.spec;
+
+import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
+
+
+public class TestSpec extends AbstractSpec{
+  private int a = 123;
+  private double b = 456.7;
+  private String c = "default";
+  private RuleBaselineProvider baselineProvider;
+
+  public RuleBaselineProvider getBaselineProvider() {
+    return baselineProvider;
+  }
+
+  public void setBaselineProvider(RuleBaselineProvider baselineProvider) {
+    this.baselineProvider = baselineProvider;
+  }
+
+  public int getA() {
+    return a;
+  }
+
+  public void setA(int a) {
+    this.a = a;
+  }
+
+  public double getB() {
+    return b;
+  }
+
+  public void setB(double b) {
+    this.b = b;
+  }
+
+  public String getC() {
+    return c;
+  }
+
+  public void setC(String c) {
+    this.c = c;
+  }
+}
+
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
new file mode 100644
index 0000000..ce1c03c
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.thirdeye.detection.wrapper;
+
+import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.api.TimeSpec;
+import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import com.linkedin.thirdeye.detection.MockDataProvider;
+import com.linkedin.thirdeye.detection.components.ThresholdRuleDetector;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class AnomalyDetectorWrapperTest {
+  private static final String PROP_METRIC_URN = "metricUrn";
+  private static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
+  private static final String PROP_DETECTOR = "detector";
+
+  private MockDataProvider provider;
+  private Map<String, Object> properties;
+  private DetectionConfigDTO config;
+  private Map<String, Object> stageSpecs;
+
+  @BeforeMethod
+  public void setUp() {
+    this.properties = new HashMap<>();
+    this.properties.put(PROP_METRIC_URN, "thirdeye:metric:1");
+    this.properties.put(PROP_DETECTOR, "$testDetector");
+    this.config = new DetectionConfigDTO();
+    this.config.setComponents(ImmutableMap.of("testDetector", new ThresholdRuleDetector()));
+    this.config.setProperties(properties);
+
+    this.provider = new MockDataProvider();
+    MetricConfigDTO metric = new MetricConfigDTO();
+    metric.setId(1L);
+    metric.setDataset("test");
+    this.provider.setMetrics(Collections.singletonList(metric));
+    DatasetConfigDTO dataset = new DatasetConfigDTO();
+    dataset.setDataset("test");
+    dataset.setTimeUnit(TimeUnit.DAYS);
+    dataset.setTimeDuration(1);
+    this.provider.setDatasets(Collections.singletonList(dataset));
+  }
+
+  @Test
+  public void testMonitoringWindow() {
+    AnomalyDetectorWrapper detectionPipeline =
+        new AnomalyDetectorWrapper(this.provider, this.config, 1538418436000L, 1540837636000L);
+    List<Interval> monitoringWindows = detectionPipeline.getMonitoringWindows();
+    for (Interval window : monitoringWindows) {
+      Assert.assertEquals(window, new Interval(1538418436000L, 1540837636000L));
+    }
+  }
+
+  @Test
+  public void testMovingMonitoringWindow() {
+    this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+    AnomalyDetectorWrapper detectionPipeline =
+        new AnomalyDetectorWrapper(this.provider, this.config, 1540147725000L, 1540493325000L);
+    List<Interval> monitoringWindows = detectionPipeline.getMonitoringWindows();
+    DateTimeZone timeZone = DateTimeZone.forID(TimeSpec.DEFAULT_TIMEZONE);
+    Assert.assertEquals(monitoringWindows,
+        Arrays.asList(new Interval(1540080000000L, 1540166400000L, timeZone), new Interval(1540166400000L, 1540252800000L, timeZone),
+            new Interval(1540252800000L, 1540339200000L, timeZone), new Interval(1540339200000L, 1540425600000L, timeZone)));
+  }
+
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/BaselineFillingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
similarity index 81%
rename from thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/BaselineFillingMergeWrapperTest.java
rename to thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 50d2cb5..8dd3c24 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/BaselineFillingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.algorithm;
+package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.collect.ImmutableMap;
 import com.linkedin.thirdeye.dataframe.DataFrame;
@@ -23,11 +23,17 @@ import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
+import com.linkedin.thirdeye.detection.InputDataFetcher;
 import com.linkedin.thirdeye.detection.MockDataProvider;
 import com.linkedin.thirdeye.detection.MockPipeline;
 import com.linkedin.thirdeye.detection.MockPipelineLoader;
 import com.linkedin.thirdeye.detection.MockPipelineOutput;
-import com.linkedin.thirdeye.detection.baseline.MockBaselineProvider;
+import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
+import com.linkedin.thirdeye.detection.components.MockBaselineProvider;
+import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
+import com.linkedin.thirdeye.detection.spec.MockBaselineProviderSpec;
+import com.linkedin.thirdeye.detection.spec.RuleBaselineProviderSpec;
+import com.linkedin.thirdeye.detection.spi.components.BaselineProvider;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,7 +49,6 @@ import static com.linkedin.thirdeye.detection.DetectionTestUtils.*;
 
 public class BaselineFillingMergeWrapperTest {
   private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
-  private static final String PROP_CURRENT_PROVIDER = "currentValueProvider";
 
   private DetectionConfigDTO config;
   private MergeWrapper wrapper;
@@ -100,9 +105,13 @@ public class BaselineFillingMergeWrapperTest {
         provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates);
 
     this.config.getProperties().put(PROP_MAX_GAP, 100);
-    this.config.getProperties().put(PROP_CURRENT_PROVIDER, ImmutableMap.of("className", MockBaselineProvider.class.getName()));
-    this.config.getProperties().put(PROP_BASELINE_PROVIDER, ImmutableMap.of("className", MockBaselineProvider.class.getName()));
-
+    this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
+    BaselineProvider baselineProvider = new MockBaselineProvider();
+    MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
+    spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
+    InputDataFetcher dataFetcher = new InputDataFetcher(provider, this.config.getId());
+    baselineProvider.init(spec, dataFetcher);
+    this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));
     this.wrapper = new BaselineFillingMergeWrapper(provider, this.config, 2900, 3600);
     DetectionPipelineResult output = this.wrapper.run();
 
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/ChildKeepingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
similarity index 98%
rename from thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/ChildKeepingMergeWrapperTest.java
rename to thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index 076dfae..4162940 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/algorithm/ChildKeepingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.linkedin.thirdeye.detection.algorithm;
+package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.collect.ImmutableSet;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
@@ -25,6 +25,7 @@ import com.linkedin.thirdeye.detection.MockDataProvider;
 import com.linkedin.thirdeye.detection.MockPipeline;
 import com.linkedin.thirdeye.detection.MockPipelineLoader;
 import com.linkedin.thirdeye.detection.MockPipelineOutput;
+import com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -33,7 +34,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.h2.command.dml.Merge;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslatorTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslatorTest.java
index 1b86d69..06a7224 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslatorTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslatorTest.java
@@ -1,28 +1,19 @@
 package com.linkedin.thirdeye.detection.yaml;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
-import com.linkedin.thirdeye.datalayer.bao.DAOTestBase;
-import com.linkedin.thirdeye.datalayer.bao.MetricConfigManager;
+import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
-import com.linkedin.thirdeye.datasource.DAORegistry;
-import com.linkedin.thirdeye.detection.ConfigUtils;
-import com.linkedin.thirdeye.detection.algorithm.BaselineAlgorithm;
-import com.linkedin.thirdeye.detection.algorithm.BaselineFillingMergeWrapper;
-import com.linkedin.thirdeye.detection.algorithm.ChildKeepingMergeWrapper;
-import com.linkedin.thirdeye.detection.algorithm.DimensionWrapper;
-import com.linkedin.thirdeye.detection.algorithm.LegacyMergeWrapper;
-import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
-import com.linkedin.thirdeye.detection.algorithm.stage.AnomalyDetectionStageWrapper;
-import com.linkedin.thirdeye.detection.algorithm.stage.AnomalyFilterStageWrapper;
-import com.linkedin.thirdeye.detection.algorithm.stage.BaselineRuleDetectionStage;
-import com.linkedin.thirdeye.detection.algorithm.stage.BaselineRuleFilterStage;
-import java.util.Arrays;
+import com.linkedin.thirdeye.detection.DataProvider;
+import com.linkedin.thirdeye.detection.MockDataProvider;
+import com.linkedin.thirdeye.detection.annotation.DetectionRegistry;
+import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
+import com.linkedin.thirdeye.detection.components.ThresholdRuleAnomalyFilter;
+import com.linkedin.thirdeye.detection.components.ThresholdRuleDetector;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import org.apache.commons.collections.MapUtils;
+import java.util.concurrent.TimeUnit;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import org.yaml.snakeyaml.Yaml;
@@ -30,152 +21,57 @@ import org.yaml.snakeyaml.Yaml;
 
 public class CompositePipelineConfigTranslatorTest {
 
-  private DAOTestBase testDAOProvider;
-  private MetricConfigManager metricConfigDAO;
   private Long metricId;
   private Yaml yaml;
-  Map<String, Object> yamlConfig;
+  private Map<String, Object> yamlConfig;
+  private DataProvider provider;
+  private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   @BeforeMethod
   public void setUp() {
-    this.testDAOProvider = DAOTestBase.getInstance();
-    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
     MetricConfigDTO metricConfig = new MetricConfigDTO();
     metricConfig.setAlias("alias");
     metricConfig.setName("test_metric");
     metricConfig.setDataset("test_dataset");
-    this.metricId = this.metricConfigDAO.save(metricConfig);
+    this.metricId = 1L;
+    metricConfig.setId(metricId);
+    DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO();
+    datasetConfigDTO.setDataset("test_dataset");
+    datasetConfigDTO.setTimeUnit(TimeUnit.DAYS);
+    datasetConfigDTO.setTimeDuration(1);
     this.yaml = new Yaml();
-    this.yamlConfig = (Map<String, Object>) this.yaml.load(this.getClass().getResourceAsStream("pipeline-config.yaml"));
+    DetectionRegistry.registerComponent(ThresholdRuleDetector.class.getName(), "THRESHOLD");
+    DetectionRegistry.registerComponent(ThresholdRuleAnomalyFilter.class.getName(), "THRESHOLD_RULE_FILTER");
+    DetectionRegistry.registerComponent(RuleBaselineProvider.class.getName(), "RULE_BASELINE");
+    this.provider = new MockDataProvider().setMetrics(Collections.singletonList(metricConfig)).setDatasets(Collections.singletonList(datasetConfigDTO));
   }
 
-  @AfterMethod
-  public void tearDown() {
-    this.testDAOProvider.cleanup();
-  }
 
   @Test
-  public void testBuildDetectionPropertiesMultipleRules() {
-    CompositePipelineConfigTranslator translator = new CompositePipelineConfigTranslator();
-    Map<String, Object> properties = translator.buildDetectionProperties(this.yamlConfig);
-    List<Map<String, Object>> nestedProperties = ConfigUtils.getList(properties.get("nested"));
-    Map<String, Object> dimensionProperties = nestedProperties.get(0);
-    List<Map<String, Object>> dimensionNestedProperties = ConfigUtils.getList(dimensionProperties.get("nested"));
-    Map<String, Object> ruleFilterWrapperProperties1 = dimensionNestedProperties.get(0);
-    List<Map<String, Object>> ruleMergeWrapperProperties1 =
-        ConfigUtils.getList(ruleFilterWrapperProperties1.get("nested"));
-    List<Map<String, Object>> baselineAlgorithmProperties1 =
-        ConfigUtils.getList(ruleMergeWrapperProperties1.get(0).get("nested"));
-    Map<String, Object> ruleFilterWrapperProperties2 = dimensionNestedProperties.get(1);
-    List<Map<String, Object>> ruleMergeWrapperProperties2 =
-        ConfigUtils.getList(ruleFilterWrapperProperties2.get("nested"));
-    List<Map<String, Object>> baselineAlgorithmProperties2 =
-        ConfigUtils.getList(ruleMergeWrapperProperties2.get(0).get("nested"));
-
-    Assert.assertEquals(properties.get("className"), ChildKeepingMergeWrapper.class.getName());
-    Assert.assertEquals(nestedProperties.size(), 1);
-    Assert.assertEquals(dimensionProperties.get("className"), DimensionWrapper.class.getName());
-    Assert.assertEquals(dimensionProperties.get("metricUrn"),
-        "thirdeye:metric:" + this.metricId + ":D1%3Dv1:D1%3Dv2:D2%3Dv3");
-    Assert.assertEquals(dimensionProperties.get("minContribution"), 0.05);
-    Assert.assertEquals(dimensionProperties.get("dimensions"), Arrays.asList("D1", "D2"));
-    Assert.assertEquals(nestedProperties.size(), 1);
-    Assert.assertEquals(dimensionNestedProperties.size(), 2);
-    Assert.assertEquals(ruleFilterWrapperProperties1.get("className"), AnomalyFilterStageWrapper.class.getName());
-    Assert.assertEquals(ruleFilterWrapperProperties1.get("stageClassName"), BaselineRuleFilterStage.class.getName());
-    Assert.assertEquals(ruleFilterWrapperProperties1.get("specs"), ImmutableMap.of("siteWideImpactThreshold", 0.1));
-    Assert.assertEquals(ruleMergeWrapperProperties1.size(), 1);
-    Assert.assertEquals(ruleMergeWrapperProperties1.get(0).get("className"),
-        BaselineFillingMergeWrapper.class.getName());
-    Assert.assertEquals(baselineAlgorithmProperties1.size(), 1);
-    Assert.assertEquals(baselineAlgorithmProperties1.get(0).get("stageClassName"),
-        BaselineRuleDetectionStage.class.getName());
-    Assert.assertEquals(baselineAlgorithmProperties1.get(0).get("specs"), ImmutableMap.of("change", 0.3));
-    Assert.assertEquals(ruleFilterWrapperProperties2.get("className"), AnomalyFilterStageWrapper.class.getName());
-    Assert.assertEquals(ruleFilterWrapperProperties2.get("stageClassName"), BaselineRuleFilterStage.class.getName());
-    Assert.assertEquals(ruleFilterWrapperProperties2.get("specs"), ImmutableMap.of("siteWideImpactThreshold", 0.2));
-    Assert.assertEquals(ruleMergeWrapperProperties2.size(), 1);
-    Assert.assertEquals(ruleMergeWrapperProperties2.get(0).get("className"),
-        BaselineFillingMergeWrapper.class.getName());
-    Assert.assertEquals(baselineAlgorithmProperties2.size(), 1);
-    Assert.assertEquals(baselineAlgorithmProperties2.get(0).get("stageClassName"),
-        BaselineRuleDetectionStage.class.getName());
-    Assert.assertEquals(baselineAlgorithmProperties2.get(0).get("specs"), ImmutableMap.of("change", 0.2));
+  public void testBuildDetectionPropertiesMultipleRules() throws Exception {
+    this.yamlConfig = (Map<String, Object>) this.yaml.load(this.getClass().getResourceAsStream("pipeline-config-1.yaml"));
+    CompositePipelineConfigTranslator translator = new CompositePipelineConfigTranslator(this.yamlConfig, this.provider);
+    YamlTranslationResult result = translator.translateYaml();
+    YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("compositePipelineTranslatorTestResult-1.json"), YamlTranslationResult.class);
+    Assert.assertEquals(expected, result);
   }
 
   @Test
-  public void testBuildDetectionPropertiesSingleRule() {
-    this.yamlConfig.put("anomalyDetection", Collections.singletonList(
-        ImmutableMap.of("detection", Collections.singletonList(ImmutableMap.of("type", "BASELINE", "change", 0.3)),
-            "filter", Collections.singletonList(ImmutableMap.of("type", "BUSINESS_RULE_FILTER", "siteWideImpactThreshold", 0.1)))));
-    CompositePipelineConfigTranslator translator = new CompositePipelineConfigTranslator();
-    Map<String, Object> properties = translator.buildDetectionProperties(this.yamlConfig);
-    List<Map<String, Object>> nestedProperties = ConfigUtils.getList(properties.get("nested"));
-    Map<String, Object> dimensionProperties = nestedProperties.get(0);
-    List<Map<String, Object>> dimensionNestedProperties = ConfigUtils.getList(dimensionProperties.get("nested"));
-    Map<String, Object> ruleFilterWrapperProperties1 = dimensionNestedProperties.get(0);
-    List<Map<String, Object>> ruleMergeWrapperProperties1 =
-        ConfigUtils.getList(ruleFilterWrapperProperties1.get("nested"));
-    List<Map<String, Object>> baselineAlgorithmProperties1 =
-        ConfigUtils.getList(ruleMergeWrapperProperties1.get(0).get("nested"));
-
-    Assert.assertEquals(properties.get("className"), ChildKeepingMergeWrapper.class.getName());
-    Assert.assertEquals(nestedProperties.size(), 1);
-    Assert.assertEquals(dimensionProperties.get("className"), DimensionWrapper.class.getName());
-    Assert.assertEquals(dimensionProperties.get("metricUrn"),
-        "thirdeye:metric:" + this.metricId + ":D1%3Dv1:D1%3Dv2:D2%3Dv3");
-    Assert.assertEquals(dimensionProperties.get("minContribution"), 0.05);
-    Assert.assertEquals(dimensionProperties.get("dimensions"), Arrays.asList("D1", "D2"));
-    Assert.assertEquals(nestedProperties.size(), 1);
-    Assert.assertEquals(dimensionNestedProperties.size(), 1);
-    Assert.assertEquals(ruleFilterWrapperProperties1.get("className"), AnomalyFilterStageWrapper.class.getName());
-    Assert.assertEquals(ruleFilterWrapperProperties1.get("stageClassName"), BaselineRuleFilterStage.class.getName());
-    Assert.assertEquals(ruleFilterWrapperProperties1.get("specs"), ImmutableMap.of("siteWideImpactThreshold", 0.1));
-    Assert.assertEquals(ruleMergeWrapperProperties1.size(), 1);
-    Assert.assertEquals(ruleMergeWrapperProperties1.get(0).get("className"),
-        BaselineFillingMergeWrapper.class.getName());
-    Assert.assertEquals(baselineAlgorithmProperties1.size(), 1);
-    Assert.assertEquals(baselineAlgorithmProperties1.get(0).get("stageClassName"),
-        BaselineRuleDetectionStage.class.getName());
-    Assert.assertEquals(baselineAlgorithmProperties1.get(0).get("specs"), ImmutableMap.of("change", 0.3));
-  }
-
-  @Test
-  public void testBuildDetectionPropertiesNoFilter() {
-    this.yamlConfig.put("anomalyDetection", Collections.singletonList(
-        ImmutableMap.of("detection", Collections.singletonList(ImmutableMap.of("type", "BASELINE", "change", 0.3)))));
-    CompositePipelineConfigTranslator translator = new CompositePipelineConfigTranslator();
-    Map<String, Object> properties = translator.buildDetectionProperties(this.yamlConfig);
-    List<Map<String, Object>> nestedProperties = ConfigUtils.getList(properties.get("nested"));
-    Map<String, Object> dimensionProperties = nestedProperties.get(0);
-    List<Map<String, Object>> dimensionNestedProperties = ConfigUtils.getList(dimensionProperties.get("nested"));
-    List<Map<String, Object>> ruleMergeWrapperProperties1 = ConfigUtils.getList(dimensionNestedProperties.get(0).get("nested"));
-    Map<String, Object> baselineAlgorithmProperties1 =
-        ruleMergeWrapperProperties1.get(0);
-
-    Assert.assertEquals(properties.get("className"), ChildKeepingMergeWrapper.class.getName());
-    Assert.assertEquals(nestedProperties.size(), 1);
-    Assert.assertEquals(dimensionProperties.get("className"), DimensionWrapper.class.getName());
-    Assert.assertEquals(dimensionProperties.get("metricUrn"),
-        "thirdeye:metric:" + this.metricId + ":D1%3Dv1:D1%3Dv2:D2%3Dv3");
-    Assert.assertEquals(dimensionProperties.get("minContribution"), 0.05);
-    Assert.assertEquals(dimensionProperties.get("dimensions"), Arrays.asList("D1", "D2"));
-    Assert.assertEquals(nestedProperties.size(), 1);
-    Assert.assertEquals(dimensionNestedProperties.get(0).get("className"), BaselineFillingMergeWrapper.class.getName());
-    Assert.assertEquals(dimensionNestedProperties.size(), 1);
-    Assert.assertEquals(ruleMergeWrapperProperties1.size(), 1);
-    Assert.assertEquals(baselineAlgorithmProperties1.get("className"), AnomalyDetectionStageWrapper.class.getName());
-    Assert.assertEquals(baselineAlgorithmProperties1.get("stageClassName"),
-        BaselineRuleDetectionStage.class.getName());
-    Assert.assertEquals(baselineAlgorithmProperties1.get("specs"), ImmutableMap.of("change", 0.3));
+  public void testBuildDetectionPropertiesNoFilter() throws Exception {
+    this.yamlConfig = (Map<String, Object>) this.yaml.load(this.getClass().getResourceAsStream("pipeline-config-2.yaml"));
+    CompositePipelineConfigTranslator translator = new CompositePipelineConfigTranslator(this.yamlConfig, this.provider);
+    YamlTranslationResult result = translator.translateYaml();
+    YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("compositePipelineTranslatorTestResult-2.json"), YamlTranslationResult.class);
+    Assert.assertEquals(expected, result);
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void testBuildDetectionPipelineMissModuleType() {
-    this.yamlConfig.put("anomalyDetection", Collections.singletonList(
-        ImmutableMap.of("detection", Collections.singletonList(ImmutableMap.of("change", 0.3)))));
-    YamlDetectionConfigTranslator translator = new CompositePipelineConfigTranslator();
+    this.yamlConfig = (Map<String, Object>) this.yaml.load(this.getClass().getResourceAsStream("pipeline-config-1.yaml"));
+    this.yamlConfig.put("rules", Collections.singletonList(
+        ImmutableMap.of("name", "rule2","detection", Collections.singletonList(ImmutableMap.of("change", 0.3)))));
+    CompositePipelineConfigTranslator translator = new CompositePipelineConfigTranslator(this.yamlConfig, this.provider);
 
-    translator.generateDetectionConfig(this.yamlConfig);
+    translator.generateDetectionConfig();
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/MockYamlDetectionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/MockYamlDetectionConfigTranslator.java
index 7194059..3c14043 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/MockYamlDetectionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/MockYamlDetectionConfigTranslator.java
@@ -1,15 +1,19 @@
 package com.linkedin.thirdeye.detection.yaml;
 
+import com.linkedin.thirdeye.detection.DataProvider;
 import java.util.HashMap;
 import java.util.Map;
 
 
 public class MockYamlDetectionConfigTranslator extends YamlDetectionConfigTranslator {
+  public MockYamlDetectionConfigTranslator(Map<String, Object> yamlConfig, DataProvider provider) {
+    super(yamlConfig, provider);
+  }
 
   @Override
-  Map<String, Object> buildDetectionProperties(Map<String, Object> yamlConfig) {
+  YamlTranslationResult translateYaml() {
     Map<String, Object> result = new HashMap<>();
     result.put("yamlConfigs", yamlConfig);
-    return result;
+    return new YamlTranslationResult().withProperties(result);
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
index 669c63c..a8b148b 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
@@ -2,6 +2,7 @@ package com.linkedin.thirdeye.detection.yaml;
 
 import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import com.linkedin.thirdeye.detection.alert.filter.ToAllRecipientsDetectionAlertFilter;
+import com.linkedin.thirdeye.detection.annotation.DetectionRegistry;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -51,10 +52,11 @@ public class YamlDetectionAlertConfigTranslatorTest {
 
   @BeforeMethod
   public void setUp() {
+    DetectionRegistry.registerComponent("testclassname", "TO_ALL_RECIPIENTS");
     this.alertYamlConfigs = new HashMap<>();
     alertYamlConfigs.put("name", "test_alert");
     alertYamlConfigs.put("type", "TO_ALL_RECIPIEnts");
-    alertYamlConfigs.put("to", Arrays.asList("jihzhang", "apucher"));
+    alertYamlConfigs.put("to", Arrays.asList("test1", "test2"));
     alertYamlConfigs.put("application", "TestApplication");
     this.translator = new YamlDetectionAlertConfigTranslator();
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslatorTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslatorTest.java
index 6217396..3a09294 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslatorTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/yaml/YamlDetectionConfigTranslatorTest.java
@@ -5,6 +5,7 @@ import com.linkedin.thirdeye.datalayer.bao.MetricConfigManager;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.datasource.DAORegistry;
+import com.linkedin.thirdeye.detection.MockDataProvider;
 import java.util.HashMap;
 import java.util.Map;
 import org.testng.Assert;
@@ -21,17 +22,17 @@ public class YamlDetectionConfigTranslatorTest {
   public void testGenerateDetectionConfig() {
     Map<String, Object> properties = new HashMap<>();
     properties.put("className", "test.linkedin.thirdeye");
-    YamlDetectionConfigTranslator translator = new MockYamlDetectionConfigTranslator();
 
     Map<String, Object> yamlConfigs = new HashMap<>();
-    yamlConfigs.put("name", "testPipeline");
+    yamlConfigs.put("detectionName", "testPipeline");
     MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
     metricConfigDTO.setName("a_daily_metric");
     metricConfigDTO.setDataset("a_test_dataset");
     metricConfigDTO.setAlias("alias");
     this.metricDAO.save(metricConfigDTO);
 
-    DetectionConfigDTO detectionConfigDTO = translator.generateDetectionConfig(yamlConfigs);
+    YamlDetectionConfigTranslator translator = new MockYamlDetectionConfigTranslator(yamlConfigs, new MockDataProvider());
+    DetectionConfigDTO detectionConfigDTO = translator.generateDetectionConfig();
     Assert.assertEquals(detectionConfigDTO.getName(), "testPipeline");
     Assert.assertEquals(detectionConfigDTO.getCron(), "0 0 14 * * ? *");
     Assert.assertEquals(detectionConfigDTO.getProperties().get("yamlConfigs"), yamlConfigs);
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
new file mode 100644
index 0000000..33d20a0
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
@@ -0,0 +1,61 @@
+{
+  "properties" : {
+    "className" : "com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+    "nested" : [ {
+      "className" : "com.linkedin.thirdeye.detection.algorithm.DimensionWrapper",
+      "metricUrn" : "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+      "nested" : [ {
+        "filter" : "$rule1_THRESHOLD_RULE_FILTER",
+        "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyFilterWrapper",
+        "nested" : [ {
+          "baselineValueProvider" : "$rule1_THRESHOLD_RULE_BASELINE",
+          "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
+          "nested" : [ {
+            "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
+            "detector" : "$rule1_THRESHOLD"
+          } ]
+        } ]
+      }, {
+        "filter" : "$rule2_THRESHOLD_RULE_FILTER",
+        "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyFilterWrapper",
+        "nested" : [ {
+          "baselineValueProvider" : "$rule2_THRESHOLD_RULE_BASELINE",
+          "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
+          "nested" : [ {
+            "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
+            "detector" : "$rule2_THRESHOLD"
+          } ]
+        } ]
+      } ],
+      "minContribution" : 0.05,
+      "dimensions" : [ "D1", "D2" ]
+    } ]
+  },
+  "components" : {
+    "rule1_THRESHOLD" : {
+      "max" : 100,
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
+    },
+    "rule2_THRESHOLD" : {
+      "max" : 100,
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
+    },
+    "rule1_THRESHOLD_RULE_FILTER" : {
+      "min" : 50,
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleAnomalyFilter"
+    },
+    "rule2_THRESHOLD_RULE_BASELINE" : {
+      "max" : 100,
+      "className" : "com.linkedin.thirdeye.detection.components.RuleBaselineProvider"
+    },
+    "rule1_THRESHOLD_RULE_BASELINE" : {
+      "max" : 100,
+      "className" : "com.linkedin.thirdeye.detection.components.RuleBaselineProvider"
+    },
+    "rule2_THRESHOLD_RULE_FILTER" : {
+      "min" : 50,
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleAnomalyFilter"
+    }
+  },
+  "cron" : "0 0 14 * * ? *"
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
new file mode 100644
index 0000000..439d622
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
@@ -0,0 +1,30 @@
+{
+  "properties" : {
+    "className" : "com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+    "nested" : [ {
+      "className" : "com.linkedin.thirdeye.detection.algorithm.DimensionWrapper",
+      "metricUrn" : "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+      "nested" : [ {
+        "baselineValueProvider" : "$rule1_THRESHOLD_RULE_BASELINE",
+        "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
+        "nested" : [ {
+          "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
+          "detector" : "$rule1_THRESHOLD"
+        } ]
+      } ],
+      "minContribution" : 0.05,
+      "dimensions" : [ "D1", "D2" ]
+    } ]
+  },
+  "components" : {
+    "rule1_THRESHOLD" : {
+      "max" : 100,
+      "className" : "com.linkedin.thirdeye.detection.components.ThresholdRuleDetector"
+    },
+    "rule1_THRESHOLD_RULE_BASELINE" : {
+      "max" : 100,
+      "className" : "com.linkedin.thirdeye.detection.components.RuleBaselineProvider"
+    }
+  },
+  "cron" : "0 0 14 * * ? *"
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-1.yaml b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-1.yaml
new file mode 100644
index 0000000..8b152f8
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-1.yaml
@@ -0,0 +1,34 @@
+detectionName: testPipeline
+metric: test_metric
+dataset: test_dataset
+pipelineType: Composite
+filters:
+  D1:
+  - v1
+  - v2
+  D2:
+  - v3
+dimensionExploration:
+  dimensions:
+  - D1
+  - D2
+  minContribution: 0.05
+rules:
+- name: rule1
+  detection:
+  - type: THRESHOLD
+    params:
+      max: 100
+  filter:
+  - type: THRESHOLD_RULE_FILTER
+    params:
+      min: 50
+- name: rule2
+  detection:
+  - type: THRESHOLD
+    params:
+      max: 100
+  filter:
+  - type: THRESHOLD_RULE_FILTER
+    params:
+      min: 50
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-2.yaml b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-2.yaml
new file mode 100644
index 0000000..815a77f
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config-2.yaml
@@ -0,0 +1,22 @@
+detectionName: testPipeline
+metric: test_metric
+dataset: test_dataset
+pipelineType: Composite
+filters:
+  D1:
+  - v1
+  - v2
+  D2:
+  - v3
+dimensionExploration:
+  dimensions:
+  - D1
+  - D2
+  minContribution: 0.05
+rules:
+- name: rule1
+  detection:
+  - type: THRESHOLD
+    params:
+      max: 100
+
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config.yaml b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config.yaml
deleted file mode 100644
index cf2563a..0000000
--- a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/pipeline-config.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-name: testPipeline
-metric: test_metric
-dataset: test_dataset
-pipelineType: Composite
-filters:
-  D1:
-  - v1
-  - v2
-  D2:
-  - v3
-dimensionExploration:
-  dimensions:
-  - D1
-  - D2
-  minContribution: 0.05
-anomalyDetection:
-- detection:
-  - type: BASELINE
-    change: 0.3
-  filter:
-  - type: BUSINESS_RULE_FILTER
-    siteWideImpactThreshold: 0.1
-- detection:
-  - type: BASELINE
-    change: 0.2
-  filter:
-  - type: BUSINESS_RULE_FILTER
-    siteWideImpactThreshold: 0.2
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org