You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2020/05/22 01:24:40 UTC

[incubator-pinot] branch master updated: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml (#5398)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 598c12b  [TE] Added generic support for configuring Data Quality rules like SLA from Yaml (#5398)
598c12b is described below

commit 598c12b69effddbab6c011ec2fb6f87f6e03b7b6
Author: Akshay Rai <ak...@linkedin.com>
AuthorDate: Thu May 21 18:24:27 2020 -0700

    [TE] Added generic support for configuring Data Quality rules like SLA from Yaml (#5398)
    
    **Overall Updates:**
    * Added ability to configure Data Quality (DQ) alerts from Yaml. See a sample below.
    * DQ pipeline leverages and extends on the existing detection merging logic pattern.
    * Refactored code to modularize & reuse the detection translators and extend them to data quality translators.
    * Updated the DATA_QUALITY Task and Pipeline Runner.
    * Added following metrics to track - # DQ Tasks, # DQ Success Tasks & #DQ Fail Tasks
    
    Note that this PR supports data quality checks only when detection is enabled. Due to limitations this PR doesn't address standalone data quality checks.
    
    Doc Link: Available on the Alert Config page (internal)
    
    **Tests:**
    * Added a bunch of unit testing around translating DQ yaml to internal json representation
    * Updated a bunch of unit testing to validate the Data SLA checker.
    * Tested the Data Quality/sla pipeline by deploying it locally.
    
    **Example:**
    ```
    ...
    rules:
    - detection:
       ...
      quality:
      - type: DATA_SLA
        name: slaRule1
        params:
          sla: 1_DAYS
    ```
---
 .../apache/pinot/thirdeye/anomaly/AnomalyType.java |   2 +-
 .../trigger/DataAvailabilityTaskScheduler.java     |  11 +-
 .../pinot/thirdeye/anomaly/task/TaskConstants.java |   2 +-
 .../thirdeye/anomaly/task/TaskInfoFactory.java     |   2 +-
 .../thirdeye/anomaly/task/TaskRunnerFactory.java   |   6 +-
 .../anomaly/utils/ThirdeyeMetricsUtil.java         |  12 +-
 .../thirdeye/constant/AnomalyResultSource.java     |   1 +
 .../datalayer/pojo/DetectionConfigBean.java        |  12 +-
 .../pinot/thirdeye/detection/DetectionUtils.java   |  17 +-
 .../alert/StatefulDetectionAlertFilter.java        |  12 +-
 .../DataQualityPipelineJob.java}                   |  20 +-
 .../dataquality/DataQualityPipelineTaskRunner.java | 143 +++++
 .../components/DataSlaQualityChecker.java          | 216 +++++++
 .../spec/DataSlaQualityCheckerSpec.java}           |  35 +-
 .../dataquality/wrapper/DataSlaWrapper.java        | 103 ++++
 .../detection/datasla/DatasetSlaTaskRunner.java    | 320 ----------
 .../validators/DetectionConfigValidator.java       |   1 +
 .../wrapper/BaselineFillingMergeWrapper.java       |   2 +
 .../wrapper/ChildKeepingMergeWrapper.java          |   6 +-
 .../detection/wrapper/DataQualityMergeWrapper.java |  99 ++++
 .../yaml/translator/DetectionConfigTranslator.java | 398 +------------
 .../translator/DetectionMetricAttributeHolder.java |  29 +-
 .../builder/DataQualityPropertiesBuilder.java      | 139 +++++
 .../builder/DetectionConfigPropertiesBuilder.java  | 222 +++++++
 .../builder/DetectionPropertiesBuilder.java        | 243 ++++++++
 .../formatter/DetectionConfigFormatter.java        |   8 +-
 .../content/BaseNotificationContent.java           |   8 +-
 .../thirdeye/scheduler/DetectionCronScheduler.java |  71 ++-
 .../scheduler/SubscriptionCronScheduler.java       |  19 +-
 .../thirdeye/scheduler/ThirdEyeCronScheduler.java  |   6 +-
 .../trigger/DataAvailabilityTaskSchedulerTest.java |   4 +-
 .../pinot/thirdeye/detection/MockDataProvider.java |  13 +
 .../dataquality/DataQualityTaskRunnerTest.java     | 642 +++++++++++++++++++++
 .../datasla/DatasetSlaTaskRunnerTest.java          | 478 ---------------
 .../DetectionConfigSlaTranslatorTest.java          | 121 ++++
 .../yaml/translator/YamlTranslationResult.java     |   9 +
 .../templates/TestMetricAnomaliesContent.java      |  10 +-
 .../channels/TestJiraContentFormatter.java         |   4 +-
 .../tools/RunAdhocDatabaseQueriesTool.java         |   4 +-
 .../detection/dataquality/sla-config-0.yaml        |  17 +
 .../detection/dataquality/sla-config-1.yaml        |  17 +
 .../detection/dataquality/sla-config-2.yaml        |  17 +
 .../detection/dataquality/sla-config-3.yaml        |  17 +
 .../detection/yaml/translator/sla-config-1.yaml    |  17 +
 .../detection/yaml/translator/sla-config-2.yaml    |  30 +
 .../detection/yaml/translator/sla-config-3.yaml    |  65 +++
 .../detection/yaml/translator/sla-config-4.yaml    |  33 ++
 .../detection/yaml/translator/sla-config-5.yaml    |  90 +++
 .../yaml/translator/sla-config-translated-1.json   |  23 +
 .../yaml/translator/sla-config-translated-2.json   |  23 +
 .../yaml/translator/sla-config-translated-3.json   |  57 ++
 .../yaml/translator/sla-config-translated-4.json   |  26 +
 .../yaml/translator/sla-config-translated-5.json   | 103 ++++
 .../resources/test-jira-anomalies-template.ftl     |   4 +-
 .../resources/test-metric-anomalies-template.html  |   6 +-
 55 files changed, 2692 insertions(+), 1303 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/AnomalyType.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/AnomalyType.java
index 364f443..5fa2c2f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/AnomalyType.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/AnomalyType.java
@@ -27,7 +27,7 @@ public enum AnomalyType {
   // There is a trend change for underline metric.
   TREND_CHANGE ("Trend Change"),
   // The metric is not available within specified time.
-  DATA_MISSING ("Data Missing");
+  DATA_SLA ("SLA Violation");
 
   private String label;
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
index c23e40e..d05bfa8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
@@ -122,9 +122,10 @@ public class DataAvailabilityTaskScheduler implements Runnable {
             long endtime = System.currentTimeMillis();
             createDetectionTask(detectionConfig, endtime);
 
-            if (DetectionUtils.isDataAvailabilityCheckEnabled(detectionConfig)) {
-              createDataSLACheckTask(detectionConfig, endtime);
-              LOG.info("Scheduling a task for data availability {} due to the fallback mechanism.", detectionConfigId);
+            if (DetectionUtils.isDataQualityCheckEnabled(detectionConfig)) {
+              createDataQualityTask(detectionConfig, endtime);
+              LOG.info("Scheduling a task for data sla check on detection config {} due to the fallback mechanism.",
+                  detectionConfigId);
             }
 
             detectionIdToLastTaskEndTimeMap.put(detectionConfig.getId(), endtime);
@@ -213,8 +214,8 @@ public class DataAvailabilityTaskScheduler implements Runnable {
     return createTask(TaskConstants.TaskType.DETECTION, detectionConfig, end);
   }
 
-  private long createDataSLACheckTask(DetectionConfigDTO detectionConfig, long end) throws JsonProcessingException {
-    return createTask(TaskConstants.TaskType.DATA_SLA, detectionConfig, end);
+  private long createDataQualityTask(DetectionConfigDTO detectionConfig, long end) throws JsonProcessingException {
+    return createTask(TaskConstants.TaskType.DATA_QUALITY, detectionConfig, end);
   }
 
   private void loadLatestTaskCreateTime(DetectionConfigDTO detectionConfig) throws Exception {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
index 206c8bb..728c1b8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
@@ -22,7 +22,7 @@ package org.apache.pinot.thirdeye.anomaly.task;
 public class TaskConstants {
 
   public enum TaskType {
-    DATA_SLA,
+    DATA_QUALITY,
     DETECTION,
     DETECTION_ALERT,
     YAML_DETECTION_ONBOARD,
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskInfoFactory.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskInfoFactory.java
index 6a3a19a..08dd300 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskInfoFactory.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskInfoFactory.java
@@ -46,7 +46,7 @@ public class TaskInfoFactory {
     TaskInfo taskInfo = null;
     try {
       switch(taskType) {
-        case DATA_SLA:
+        case DATA_QUALITY:
           taskInfo = OBJECT_MAPPER.readValue(taskInfoString, DetectionPipelineTaskInfo.class);
           break;
         case DETECTION:
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskRunnerFactory.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskRunnerFactory.java
index fe160f7..4f2012b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskRunnerFactory.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskRunnerFactory.java
@@ -27,7 +27,7 @@ import org.apache.pinot.thirdeye.anomaly.task.TaskConstants.TaskType;
 import org.apache.pinot.thirdeye.completeness.checker.DataCompletenessTaskRunner;
 import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskRunner;
 import org.apache.pinot.thirdeye.detection.alert.DetectionAlertTaskRunner;
-import org.apache.pinot.thirdeye.detection.datasla.DatasetSlaTaskRunner;
+import org.apache.pinot.thirdeye.detection.dataquality.DataQualityPipelineTaskRunner;
 import org.apache.pinot.thirdeye.detection.onboard.YamlOnboardingTaskRunner;
 
 
@@ -39,8 +39,8 @@ public class TaskRunnerFactory {
   public static TaskRunner getTaskRunnerFromTaskType(TaskType taskType) {
     TaskRunner taskRunner = null;
     switch (taskType) {
-      case DATA_SLA:
-        taskRunner = new DatasetSlaTaskRunner();
+      case DATA_QUALITY:
+        taskRunner = new DataQualityPipelineTaskRunner();
         break;
       case DETECTION:
         taskRunner = new DetectionPipelineTaskRunner();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
index d6de6c2..cffdd26 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
@@ -62,15 +62,21 @@ public class ThirdeyeMetricsUtil {
   public static final Counter detectionTaskCounter =
       metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "detectionTaskCounter");
 
-  public static final Counter dataAvailabilityTaskCounter =
-      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "dataAvailabilityTaskCounter");
-
   public static final Counter detectionTaskSuccessCounter =
       metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "detectionTaskSuccessCounter");
 
   public static final Counter detectionTaskExceptionCounter =
       metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "detectionTaskExceptionCounter");
 
+  public static final Counter dataQualityTaskCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "dataQualityTaskCounter");
+
+  public static final Counter dataQualityTaskSuccessCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "dataQualityTaskSuccessCounter");
+
+  public static final Counter dataQualityTaskExceptionCounter =
+      metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "dataQualityTaskExceptionCounter");
+
   public static final Counter alertTaskCounter =
       metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "alertTaskCounter");
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyResultSource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyResultSource.java
index fa41686..6ce3a9d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyResultSource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyResultSource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.thirdeye.constant;
 
 public enum AnomalyResultSource {
   DEFAULT_ANOMALY_DETECTION,
+  DATA_QUALITY_DETECTION,
   ANOMALY_REPLAY,
   USER_LABELED_ANOMALY
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/DetectionConfigBean.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/DetectionConfigBean.java
index bb587a0..0a0a700 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/DetectionConfigBean.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/DetectionConfigBean.java
@@ -47,7 +47,7 @@ public class DetectionConfigBean extends AbstractBean {
   List<String> owners;
 
   // Stores properties related to data SLA rules for every metric
-  Map<String, Object> dataSLAProperties;
+  Map<String, Object> dataQualityProperties;
 
   boolean isDataAvailabilitySchedule;
   long taskTriggerFallBackTimeInSec;
@@ -133,12 +133,12 @@ public class DetectionConfigBean extends AbstractBean {
     this.active = active;
   }
 
-  public Map<String, Object> getDataSLAProperties() {
-    return dataSLAProperties;
+  public Map<String, Object> getDataQualityProperties() {
+    return dataQualityProperties;
   }
 
-  public void setDataSLAProperties(Map<String, Object> dataSLAProperties) {
-    this.dataSLAProperties = dataSLAProperties;
+  public void setDataQualityProperties(Map<String, Object> dataQualityProperties) {
+    this.dataQualityProperties = dataQualityProperties;
   }
 
   public boolean isDataAvailabilitySchedule() {
@@ -176,7 +176,7 @@ public class DetectionConfigBean extends AbstractBean {
     DetectionConfigBean that = (DetectionConfigBean) o;
     return lastTimestamp == that.lastTimestamp && active == that.active && Objects.equals(cron, that.cron)
         && Objects.equals(name, that.name) && Objects.equals(properties, that.properties) && Objects.equals(yaml,
-        that.yaml) && Objects.equals(dataSLAProperties, that.dataSLAProperties)
+        that.yaml) && Objects.equals(dataQualityProperties, that.dataQualityProperties)
         && Objects.equals(isDataAvailabilitySchedule, that.isDataAvailabilitySchedule) && Objects
         .equals(taskTriggerFallBackTimeInSec, that.taskTriggerFallBackTimeInSec);
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
index d993e28..4352d1a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
@@ -166,10 +166,13 @@ public class DetectionUtils {
    * @return anomaly template
    */
   public static MergedAnomalyResultDTO makeAnomaly(MetricSlice slice) {
-    MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
-    anomaly.setStartTime(slice.getStart());
-    anomaly.setEndTime(slice.getEnd());
+    return makeAnomaly(slice.getStart(), slice.getEnd());
+  }
 
+  public static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
+    MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+    anomaly.setStartTime(start);
+    anomaly.setEndTime(end);
     return anomaly;
   }
 
@@ -344,11 +347,11 @@ public class DetectionUtils {
   }
 
   /**
-   * Verify if this detection has data availability checks enabled
+   * Verify if this detection has data quality checks enabled
    */
-  public static boolean isDataAvailabilityCheckEnabled(DetectionConfigDTO detectionConfig) {
-    return detectionConfig.getDataSLAProperties() != null
-        && !detectionConfig.getDataSLAProperties().isEmpty();
+  public static boolean isDataQualityCheckEnabled(DetectionConfigDTO detectionConfig) {
+    return detectionConfig.getDataQualityProperties() != null
+        && !detectionConfig.getDataQualityProperties().isEmpty();
   }
 
   public static long makeTimeout(long deadline) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/StatefulDetectionAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
index b099677..4437c0f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
@@ -78,12 +78,12 @@ public abstract class StatefulDetectionAlertFilter extends DetectionAlertFilter
       Collection<MergedAnomalyResultDTO> anomalies =
           Collections2.filter(candidates, new Predicate<MergedAnomalyResultDTO>() {
             @Override
-            public boolean apply(@Nullable MergedAnomalyResultDTO mergedAnomalyResultDTO) {
-              return mergedAnomalyResultDTO != null
-                  && !mergedAnomalyResultDTO.isChild()
-                  && !AlertUtils.hasFeedback(mergedAnomalyResultDTO)
-                  && mergedAnomalyResultDTO.getCreatedTime() > finalStartTime
-                  && mergedAnomalyResultDTO.getAnomalyResultSource().equals(AnomalyResultSource.DEFAULT_ANOMALY_DETECTION);
+            public boolean apply(@Nullable MergedAnomalyResultDTO anomaly) {
+              return anomaly != null && !anomaly.isChild()
+                  && !AlertUtils.hasFeedback(anomaly)
+                  && anomaly.getCreatedTime() > finalStartTime
+                  && (anomaly.getAnomalyResultSource().equals(AnomalyResultSource.DEFAULT_ANOMALY_DETECTION) ||
+                  anomaly.getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION));
             }
           });
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionDataSLAJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
similarity index 76%
rename from thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionDataSLAJob.java
rename to thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
index b63a412..13a2989 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionDataSLAJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pinot.thirdeye.detection;
+package org.apache.pinot.thirdeye.detection.dataquality;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -26,14 +26,20 @@ import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
 import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.TaskUtils;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DetectionDataSLAJob implements Job {
-  private static final Logger LOG = LoggerFactory.getLogger(DetectionDataSLAJob.class);
+/**
+ * The data quality job submitted to the scheduler. This job creates data quality tasks which
+ * the runners will later pick and execute.
+ */
+public class DataQualityPipelineJob implements Job {
+  private static final Logger LOG = LoggerFactory.getLogger(DataQualityPipelineJob.class);
 
   private final TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();
 
@@ -45,10 +51,10 @@ public class DetectionDataSLAJob implements Job {
     DetectionPipelineTaskInfo taskInfo = TaskUtils.buildTaskInfo(jobExecutionContext);
 
     // if a task is pending and not time out yet, don't schedule more
-    String jobName = String.format("%s_%d", TaskConstants.TaskType.DATA_SLA, taskInfo.configId);
+    String jobName = String.format("%s_%d", TaskConstants.TaskType.DATA_QUALITY, taskInfo.getConfigId());
     if (TaskUtils.checkTaskAlreadyRun(jobName, taskInfo, DATA_AVAILABILITY_TASK_TIMEOUT)) {
       LOG.info("Skip scheduling {} task for {} with start time {}. Task is already in the queue.",
-          TaskConstants.TaskType.DATA_SLA, jobName, taskInfo.getStart());
+          TaskConstants.TaskType.DATA_QUALITY, jobName, taskInfo.getStart());
       return;
     }
 
@@ -59,9 +65,9 @@ public class DetectionDataSLAJob implements Job {
       LOG.error("Exception when converting DetectionPipelineTaskInfo {} to jsonString", taskInfo, e);
     }
 
-    TaskDTO taskDTO = TaskUtils.buildTask(taskInfo.configId, taskInfoJson, TaskConstants.TaskType.DATA_SLA);
+    TaskDTO taskDTO = TaskUtils.buildTask(taskInfo.getConfigId(), taskInfoJson, TaskConstants.TaskType.DATA_QUALITY);
     long taskId = taskDAO.save(taskDTO);
-    LOG.info("Created {} task {} with taskId {}", TaskConstants.TaskType.DATA_SLA, taskDTO, taskId);
+    LOG.info("Created {} task {} with taskId {}", TaskConstants.TaskType.DATA_QUALITY, taskDTO, taskId);
   }
 }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineTaskRunner.java
new file mode 100644
index 0000000..80081bf
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineTaskRunner.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.thirdeye.anomaly.task.TaskContext;
+import org.apache.pinot.thirdeye.anomaly.task.TaskInfo;
+import org.apache.pinot.thirdeye.anomaly.task.TaskResult;
+import org.apache.pinot.thirdeye.anomaly.task.TaskRunner;
+import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DefaultDataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Ths class is responsible for running the data quality tasks
+ */
+public class DataQualityPipelineTaskRunner implements TaskRunner {
+  private static final Logger LOG = LoggerFactory.getLogger(DataQualityPipelineTaskRunner.class);
+  private final DetectionConfigManager detectionDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final DetectionPipelineLoader loader;
+  private final DataProvider provider;
+
+  /**
+   * Default constructor for ThirdEye task execution framework.
+   * Loads dependencies from DAORegitry and CacheRegistry
+   *
+   * @see DAORegistry
+   * @see ThirdEyeCacheRegistry
+   */
+  public DataQualityPipelineTaskRunner() {
+    this.loader = new DetectionPipelineLoader();
+    this.detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    MetricConfigManager metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    DatasetConfigManager datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    EventManager eventDAO = DAORegistry.getInstance().getEventDAO();
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricDAO, datasetDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(), ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricDAO, datasetDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.provider = new DefaultDataProvider(metricDAO, datasetDAO, eventDAO, this.anomalyDAO, this.evaluationDAO,
+        timeseriesLoader, aggregationLoader, new DetectionPipelineLoader(), TimeSeriesCacheBuilder.getInstance(),
+        AnomaliesCacheBuilder.getInstance());
+  }
+
+  public DataQualityPipelineTaskRunner(DetectionConfigManager detectionDAO, MergedAnomalyResultManager anomalyDAO,
+      EvaluationManager evaluationDAO, DetectionPipelineLoader loader, DataProvider provider) {
+    this.detectionDAO = detectionDAO;
+    this.anomalyDAO = anomalyDAO;
+    this.evaluationDAO = evaluationDAO;
+    this.loader = loader;
+    this.provider = provider;
+  }
+
+  @Override
+  public List<TaskResult> execute(TaskInfo taskInfo, TaskContext taskContext) throws Exception {
+    ThirdeyeMetricsUtil.dataQualityTaskCounter.inc();
+
+    try {
+      DetectionPipelineTaskInfo info = (DetectionPipelineTaskInfo) taskInfo;
+      DetectionConfigDTO config = this.detectionDAO.findById(info.getConfigId());
+      if (config == null) {
+        throw new IllegalArgumentException(String.format("Could not resolve config id %d", info.getConfigId()));
+      }
+
+      LOG.info("Start data quality check for config {} between {} and {}", config.getId(), info.getStart(), info.getEnd());
+      Map<String, Object> props =  config.getProperties();
+      // A small hack to reuse the properties field to run the data quality pipeline; this is reverted after the run.
+      config.setProperties(config.getDataQualityProperties());
+      DetectionPipeline pipeline = this.loader.from(this.provider, config, info.getStart(), info.getEnd());
+      DetectionPipelineResult result = pipeline.run();
+      // revert the properties field back to detection properties
+      config.setProperties(props);
+
+      // Save all the data quality anomalies
+      for (MergedAnomalyResultDTO mergedAnomalyResultDTO : result.getAnomalies()) {
+        this.anomalyDAO.save(mergedAnomalyResultDTO);
+        if (mergedAnomalyResultDTO.getId() == null) {
+          LOG.warn("Could not store anomaly:\n{}", mergedAnomalyResultDTO);
+        }
+      }
+
+      ThirdeyeMetricsUtil.dataQualityTaskSuccessCounter.inc();
+      LOG.info("End data quality check for config {} between {} and {}. Detected {} anomalies.", config.getId(),
+          info.getStart(), info.getEnd(), result.getAnomalies());
+      return Collections.emptyList();
+    } catch(Exception e) {
+      ThirdeyeMetricsUtil.dataQualityTaskExceptionCounter.inc();
+      throw e;
+    }
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
new file mode 100644
index 0000000..42d6860
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.components;
+
+import com.google.common.collect.ArrayListMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.PresentationOption;
+import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs data sla checks for the window and generates DATA_SLA anomalies.
+ *
+ * Data SLA is verified based on the following information.
+ * a. The dataset refresh timestamp updated by the event based data availability pipeline (if applicable).
+ * b. Otherwise, we will query the data source and run sla checks.
+ */
+@Components(title = "Data Sla Quality Checker",
+    type = "DATA_SLA",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Checks if data is missing or not based on the configured sla",
+    presentation = {
+        @PresentationOption(name = "data sla", template = "is ${sla}")
+    },
+    params = {
+        @Param(name = "sla", placeholder = "value")
+    })
+public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class);
+
+  private String sla;
+  private InputDataFetcher dataFetcher;
+  private final String DEFAULT_DATA_SLA = "3_DAYS";
+
+  @Override
+  public DetectionResult runDetection(Interval window, String metricUrn) {
+    return DetectionResult.from(runSLACheck(MetricEntity.fromURN(metricUrn), window));
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    return TimeSeries.empty();
+  }
+
+  @Override
+  public void init(DataSlaQualityCheckerSpec spec, InputDataFetcher dataFetcher) {
+    this.sla = spec.getSla();
+    this.dataFetcher = dataFetcher;
+  }
+
+  /**
+   * Runs the data sla check for the window on the given metric.
+   */
+  private List<MergedAnomalyResultDTO> runSLACheck(MetricEntity me, Interval window) {
+    List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+    long expectedDatasetRefreshTime = window.getEnd().getMillis();
+    InputData data = this.dataFetcher.fetchData(new InputDataSpec()
+        .withMetricIdsForDataset(Collections.singletonList(me.getId()))
+        .withMetricIds(Collections.singletonList(me.getId())));
+    DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+
+    try {
+      long datasetLastRefreshTime = fetchLatestDatasetRefreshTime(data, me, window);
+      MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, expectedDatasetRefreshTime);
+
+      if (isSLAViolated(datasetLastRefreshTime, expectedDatasetRefreshTime)) {
+        anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+      }
+    } catch (Exception e) {
+      LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e);
+    }
+
+    return anomalies;
+  }
+
+  /**
+   * Fetches the latest timestamp for the dataset. It relies on 2 sources:
+   * a. Data-trigger/availability based refresh timestamp
+   * b. If not available, check by directly querying the data-source.
+   */
+  private long fetchLatestDatasetRefreshTime(InputData data, MetricEntity me, Interval window) {
+    long startTime = window.getStart().getMillis();
+    long endTime = window.getEnd().getMillis();
+    // Note that we only measure the overall dataset availability. Filters are not considered as the
+    // data availability events fire at the dataset level.
+    MetricSlice metricSlice = MetricSlice.from(me.getId(), startTime, endTime, ArrayListMultimap.create());
+
+    // Fetch dataset refresh time based on the data availability events
+    DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+    long eventBasedRefreshTime = datasetConfig.getLastRefreshTime();
+    if (eventBasedRefreshTime <= 0) {
+      // no availability event -> assume we have processed data till the current detection start
+      eventBasedRefreshTime = startTime - 1;
+    }
+
+    // If the data availability event indicates no data or partial data, we will confirm with the data source.
+    if (eventBasedRefreshTime < startTime || isPartialData(eventBasedRefreshTime, endTime, datasetConfig)) {
+      // Double check with data source. This can happen if,
+      // 1. This dataset/source doesn't not support data trigger/availability events
+      // 2. The data trigger event didn't arrive due to some upstream issue.
+      DataFrame dataFrame = this.dataFetcher.fetchData(new InputDataSpec()
+          .withTimeseriesSlices(Collections.singletonList(metricSlice))).getTimeseries().get(metricSlice);
+      if (dataFrame != null && !dataFrame.isEmpty()) {
+        return dataFrame.getDoubles("timestamp").max().longValue();
+      }
+    }
+
+    return eventBasedRefreshTime;
+  }
+
+  /**
+   * We say the data is partial if we do not have all the data points in the sla detection window.
+   * Or more specifically if we have at least 1 data-point missing in the sla detection window.
+   *
+   * For example:
+   * Assume that the data is delayed and our current sla detection window is [1st Feb to 3rd Feb). During this scan,
+   * let's say data for 1st Feb arrives. Now, we have a situation where partial data is present. In other words, in the
+   * current sla detection window [1st to 3rd Feb) we have data for 1st Feb but data for 2nd Feb is missing.
+   *
+   * Based on this information, we can smartly decide if we need to query the data source or not.
+   */
+  private boolean isPartialData(long actual, long expected, DatasetConfigDTO datasetConfig) {
+    long granularity = datasetConfig.bucketTimeGranularity().toMillis();
+    return (expected - actual) * 1.0 / granularity > 1;
+  }
+
+  /**
+   * Validates if the data is delayed or not based on the user specified SLA configuration
+   *
+   * fetch the user configured SLA, otherwise default 3_DAYS.
+   */
+  private boolean isSLAViolated(long actualRefreshTime, long expectedRefreshTime) {
+    String sla = StringUtils.isNotEmpty(this.sla) ? this.sla : DEFAULT_DATA_SLA;
+    long delay = TimeGranularity.fromString(sla).toPeriod().toStandardDuration().getMillis();
+
+    return (expectedRefreshTime - actualRefreshTime) > delay;
+  }
+
+  /**
+   * Align and round off start time to the upper boundary of the granularity
+   */
+  private static long alignToUpperBoundary(long timestamp, DatasetConfigDTO datasetConfig) {
+    Period granularityPeriod = datasetConfig.bucketTimeGranularity().toPeriod();
+    DateTimeZone timezone = DateTimeZone.forID(datasetConfig.getTimezone());
+    DateTime startTime = new DateTime(timestamp - 1, timezone).plus(granularityPeriod);
+    return (startTime.getMillis() / granularityPeriod.toStandardDuration().getMillis()) * granularityPeriod.toStandardDuration().getMillis();
+  }
+
+  /**
+   * Creates a DATA_SLA anomaly from ceiling(start) to ceiling(end) for the detection id.
+   * If existing DATA_SLA anomalies are present, then it will be merged accordingly.
+   */
+  private MergedAnomalyResultDTO createDataSLAAnomaly(MetricSlice slice, DatasetConfigDTO datasetConfig) {
+    MergedAnomalyResultDTO anomaly = DetectionUtils.makeAnomaly(
+        alignToUpperBoundary(slice.getStart(), datasetConfig),
+        alignToUpperBoundary(slice.getEnd(), datasetConfig));
+    anomaly.setCollection(datasetConfig.getName());
+    anomaly.setType(AnomalyType.DATA_SLA);
+    anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
+
+    // Store the metadata in the anomaly
+    Map<String, String> properties = new HashMap<>();
+    properties.put("datasetLastRefreshTime", String.valueOf(slice.getStart() - 1));
+    properties.put("sla", sla);
+    anomaly.setProperties(properties);
+
+    return anomaly;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
similarity index 62%
copy from thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
copy to thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
index 206c8bb..edfbc19 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
@@ -17,31 +17,22 @@
  * under the License.
  */
 
-package org.apache.pinot.thirdeye.anomaly.task;
+package org.apache.pinot.thirdeye.detection.dataquality.spec;
 
-public class TaskConstants {
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.detection.spec.AbstractSpec;
 
-  public enum TaskType {
-    DATA_SLA,
-    DETECTION,
-    DETECTION_ALERT,
-    YAML_DETECTION_ONBOARD,
-    ANOMALY_DETECTION,
-    MERGE,
-    // TODO: deprecate ALERT task type
-    ALERT,
-    ALERT2,
-    MONITOR,
-    DATA_COMPLETENESS,
-    CLASSIFICATION,
-    REPLAY
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DataSlaQualityCheckerSpec extends AbstractSpec {
+  private String sla = "3_DAYS";
+
+  public String getSla() {
+    return sla;
   }
 
-  public enum TaskStatus {
-    WAITING,
-    RUNNING,
-    COMPLETED,
-    FAILED,
-    TIMEOUT
+  public void setSla(String sla) {
+    this.sla = sla;
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/wrapper/DataSlaWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/wrapper/DataSlaWrapper.java
new file mode 100644
index 0000000..b0cb0e7
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/wrapper/DataSlaWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.wrapper;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator.*;
+
+
+/**
+ * Wrapper class that is responsible for running the @{link DataSlaQualityChecker}
+ */
+public class DataSlaWrapper extends DetectionPipeline {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSlaWrapper.class);
+
+  private static final String PROP_METRIC_URN = "metricUrn";
+  private static final String PROP_QUALITY_CHECK = "qualityCheck";
+  private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
+
+  private final AnomalyDetector qualityChecker;
+  private final DatasetConfigDTO dataset;
+  private final MetricEntity metricEntity;
+  private final MetricConfigDTO metric;
+  private final String qualityCheckerKey;
+  private final String entityName;
+  private final String metricUrn;
+
+  public DataSlaWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
+    super(provider, config, startTime, endTime);
+
+    Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_SUB_ENTITY_NAME));
+    this.entityName = MapUtils.getString(config.getProperties(), PROP_SUB_ENTITY_NAME);
+
+    Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_QUALITY_CHECK));
+    this.qualityCheckerKey = DetectionUtils.getComponentKey(MapUtils.getString(config.getProperties(), PROP_QUALITY_CHECK));
+    Preconditions.checkArgument(this.config.getComponents().containsKey(this.qualityCheckerKey));
+    this.qualityChecker = (AnomalyDetector) this.config.getComponents().get(this.qualityCheckerKey);
+
+    this.metricUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
+    this.metricEntity = MetricEntity.fromURN(this.metricUrn);
+    this.metric = provider.fetchMetrics(Collections.singleton(this.metricEntity.getId())).get(this.metricEntity.getId());
+
+    MetricConfigDTO metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(this.metricEntity.getId())).get(this.metricEntity.getId());
+    this.dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
+        .get(metricConfigDTO.getDataset());
+  }
+
+  @Override
+  public DetectionPipelineResult run() throws Exception {
+    LOG.info("Check data sla for config {} between {} and {}", config.getId(), startTime, endTime);
+    Interval window =  new Interval(startTime, endTime, DateTimeZone.forID(dataset.getTimezone()));
+    DetectionResult detectionResult = qualityChecker.runDetection(window, this.metricUrn);
+    List<MergedAnomalyResultDTO> anomalies = new ArrayList<>(detectionResult.getAnomalies());
+
+    for (MergedAnomalyResultDTO anomaly : anomalies) {
+      anomaly.setDetectionConfigId(this.config.getId());
+      anomaly.setMetricUrn(this.metricUrn);
+      anomaly.setMetric(this.metric.getName());
+      anomaly.setCollection(this.metric.getDataset());
+      anomaly.setDimensions(DetectionUtils.toFilterMap(this.metricEntity.getFilters()));
+      anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, this.qualityCheckerKey);
+      anomaly.getProperties().put(PROP_SUB_ENTITY_NAME, this.entityName);
+    }
+
+    return new DetectionPipelineResult(anomalies);
+  }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunner.java
deleted file mode 100644
index 359a295..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunner.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.datasla;
-
-import com.google.common.collect.ArrayListMultimap;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.pinot.thirdeye.anomaly.AnomalyType;
-import org.apache.pinot.thirdeye.anomaly.task.TaskContext;
-import org.apache.pinot.thirdeye.anomaly.task.TaskInfo;
-import org.apache.pinot.thirdeye.anomaly.task.TaskResult;
-import org.apache.pinot.thirdeye.anomaly.task.TaskRunner;
-import org.apache.pinot.thirdeye.common.time.TimeGranularity;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
-import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
-import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.pojo.MergedAnomalyResultBean;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
-import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
-import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
-import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
-import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
-import org.apache.pinot.thirdeye.detection.ConfigUtils;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DefaultDataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
-import org.apache.pinot.thirdeye.detection.DetectionUtils;
-import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
-import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil.*;
-
-
-/**
- * Runner that generates Data SLA anomalies. DATA_MISSING anomalies are created if
- * the data is not available for the sla detection window within the configured SLA.
- */
-public class DatasetSlaTaskRunner implements TaskRunner {
-  private static final Logger LOG = LoggerFactory.getLogger(DatasetSlaTaskRunner.class);
-
-  private final DetectionConfigManager detectionDAO;
-  private final MergedAnomalyResultManager anomalyDAO;
-  private final EvaluationManager evaluationDAO;
-  private DataProvider provider;
-
-  private final String DEFAULT_DATA_SLA = "3_DAYS";
-
-  public DatasetSlaTaskRunner() {
-    this.detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
-    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
-    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
-
-    MetricConfigManager metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
-    DatasetConfigManager datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
-    EventManager eventDAO = DAORegistry.getInstance().getEventDAO();
-
-    TimeSeriesLoader timeseriesLoader =
-        new DefaultTimeSeriesLoader(metricDAO, datasetDAO,
-            ThirdEyeCacheRegistry.getInstance().getQueryCache(), ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
-
-    AggregationLoader aggregationLoader =
-        new DefaultAggregationLoader(metricDAO, datasetDAO,
-            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
-            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
-
-    this.provider = new DefaultDataProvider(metricDAO, datasetDAO, eventDAO, this.anomalyDAO, this.evaluationDAO,
-        timeseriesLoader, aggregationLoader, new DetectionPipelineLoader(), TimeSeriesCacheBuilder.getInstance(),
-        AnomaliesCacheBuilder.getInstance());
-  }
-
-  public DatasetSlaTaskRunner(DetectionConfigManager detectionDAO, MergedAnomalyResultManager anomalyDAO,
-      EvaluationManager evaluationDAO, DataProvider provider) {
-    this.detectionDAO = detectionDAO;
-    this.anomalyDAO = anomalyDAO;
-    this.evaluationDAO = evaluationDAO;
-    this.provider = provider;
-  }
-
-  @Override
-  public List<TaskResult> execute(TaskInfo taskInfo, TaskContext taskContext) throws Exception {
-    dataAvailabilityTaskCounter.inc();
-
-    try {
-      DetectionPipelineTaskInfo info = (DetectionPipelineTaskInfo) taskInfo;
-
-      DetectionConfigDTO config = this.detectionDAO.findById(info.getConfigId());
-      if (config == null) {
-        throw new IllegalArgumentException(String.format("Could not resolve config id %d", info.getConfigId()));
-      }
-
-      LOG.info("Check data sla for config {} between {} and {}", config.getId(), info.getStart(), info.getEnd());
-      Map<String, Object> metricUrnToSlaMap = config.getDataSLAProperties();
-      if (MapUtils.isNotEmpty(metricUrnToSlaMap)) {
-        for (Map.Entry<String, Object> metricUrnToSlaMapEntry : metricUrnToSlaMap.entrySet()) {
-          MetricEntity me = MetricEntity.fromURN(metricUrnToSlaMapEntry.getKey());
-          MetricConfigDTO
-              metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
-          Map<String, DatasetConfigDTO> datasetToConfigMap = provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()));
-          for (Map.Entry<String, DatasetConfigDTO> datasetSLA : datasetToConfigMap.entrySet()) {
-            try {
-              runSLACheck(me, datasetSLA.getValue(), info, ConfigUtils.getMap(metricUrnToSlaMapEntry.getValue()));
-            } catch (Exception e) {
-              LOG.error("Failed to run sla check on metric URN %s", datasetSLA.getKey(), e);
-            }
-          }
-        }
-      }
-
-      //LOG.info("End data availability for config {} between {} and {}. Detected {} anomalies.", config.getId(),
-      //    info.getStart(), info.getEnd(), anomaliesList.size());
-      return Collections.emptyList();
-    } catch(Exception e) {
-      throw e;
-    }
-  }
-
-  /**
-   * Runs the data sla check for the window (info) on the given metric (me) using the configured sla properties
-   */
-  private void runSLACheck(MetricEntity me, DatasetConfigDTO datasetConfig, DetectionPipelineTaskInfo info,
-      Map<String, Object> slaProps) {
-    if (me == null || datasetConfig == null || info == null) {
-      //nothing to check
-      return;
-    }
-
-    try {
-      long datasetLastRefreshTime = datasetConfig.getLastRefreshTime();
-      if (datasetLastRefreshTime <= 0) {
-        // assume we have processed data till the current detection start
-        datasetLastRefreshTime = info.getStart() - 1;
-      }
-
-      if (isMissingData(datasetLastRefreshTime, info)) {
-        // Double check with data source as 2 things are possible.
-        // 1. This dataset/source may not support availability events
-        // 2. The data availability event pipeline has some issue.
-        // We want to measure the overall dataset availability (filters are not taken into account)
-        MetricSlice metricSlice = MetricSlice.from(me.getId(), info.getStart(), info.getEnd(), ArrayListMultimap.<String, String>create());
-        DataFrame dataFrame = this.provider.fetchTimeseries(Collections.singleton(metricSlice)).get(metricSlice);
-        if (dataFrame == null || dataFrame.isEmpty()) {
-          // no data
-          if (hasMissedSLA(datasetLastRefreshTime, slaProps, info.getEnd())) {
-            createDataSLAAnomaly(datasetLastRefreshTime + 1, info.getEnd(), info.getConfigId(), datasetConfig, slaProps);
-          }
-        } else {
-          datasetLastRefreshTime = dataFrame.getDoubles("timestamp").max().longValue();
-          if (isPartialData(datasetLastRefreshTime, info, datasetConfig)) {
-            if (hasMissedSLA(datasetLastRefreshTime, slaProps, info.getEnd())) {
-              createDataSLAAnomaly(datasetLastRefreshTime + 1, info.getEnd(), info.getConfigId(), datasetConfig, slaProps);
-            }
-          }
-        }
-      } else if (isPartialData(datasetLastRefreshTime, info, datasetConfig)) {
-        // Optimize for the common case - the common case is that the data availability events are arriving
-        // correctly and we need not re-fetch the data to double check.
-        if (hasMissedSLA(datasetLastRefreshTime, slaProps, info.getEnd())) {
-          createDataSLAAnomaly(datasetLastRefreshTime + 1, info.getEnd(), info.getConfigId(), datasetConfig, slaProps);
-        }
-      }
-    } catch (Exception e) {
-      LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e);
-    }
-  }
-
-  /**
-   * We say the data is missing we do not have data in the sla detection window.
-   * Or more specifically if the dataset watermark is below the sla detection window.
-   */
-  private boolean isMissingData(long datasetLastRefreshTime, DetectionPipelineTaskInfo info) {
-    return datasetLastRefreshTime < info.getStart();
-  }
-
-  /**
-   * We say the data is partial if we do not have all the data points in the sla detection window.
-   * Or more specifically if we have at least 1 data-point missing in the sla detection window.
-   *
-   * For example:
-   * Assume that the data is delayed and our current sla detection window is [1st Feb to 3rd Feb). During this scan,
-   * let's say data for 1st Feb arrives. Now, we have a situation where partial data is present. In other words, in the
-   * current sla detection window [1st to 3rd Feb) we have data for 1st Feb but data for 2nd Feb is missing. This is the
-   * partial data scenario.
-   */
-  private boolean isPartialData(long datasetLastRefreshTime, DetectionPipelineTaskInfo info, DatasetConfigDTO datasetConfig) {
-    long granularity = datasetConfig.bucketTimeGranularity().toMillis();
-    return (info.getEnd() - datasetLastRefreshTime) * 1.0 / granularity > 1;
-  }
-
-  /**
-   * Validates if the data is delayed or not based on the user specified SLA configuration
-   */
-  private boolean hasMissedSLA(long datasetLastRefreshTime, Map<String, Object> slaProps, long slaDetectionEndTime) {
-    // fetch the user configured SLA, otherwise default 3_DAYS.
-    long delay = TimeGranularity.fromString(MapUtils.getString(slaProps,  "sla", DEFAULT_DATA_SLA))
-        .toPeriod().toStandardDuration().getMillis();
-
-    return (slaDetectionEndTime - datasetLastRefreshTime) >= delay;
-  }
-
-  /**
-   * Align and round off start time to the upper boundary of the granularity
-   */
-  private static long alignToUpperBoundary(long start, DatasetConfigDTO datasetConfig) {
-    Period granularityPeriod = datasetConfig.bucketTimeGranularity().toPeriod();
-    DateTimeZone timezone = DateTimeZone.forID(datasetConfig.getTimezone());
-    DateTime startTime = new DateTime(start - 1, timezone).plus(granularityPeriod);
-    return startTime.getMillis() / granularityPeriod.toStandardDuration().getMillis() * granularityPeriod.toStandardDuration().getMillis();
-  }
-
-  /**
-   * Merges one DATA_MISSING anomaly with remaining existing anomalies.
-   */
-  private void mergeSLAAnomalies(MergedAnomalyResultDTO anomaly, List<MergedAnomalyResultDTO> existingAnomalies) {
-    // Extract the parent SLA anomaly. We can have only 1 parent DATA_MISSING anomaly in a window.
-    existingAnomalies.removeIf(MergedAnomalyResultBean::isChild);
-    MergedAnomalyResultDTO existingParentSLAAnomaly = existingAnomalies.get(0);
-
-    if (isDuplicateSLAAnomaly(existingParentSLAAnomaly, anomaly)) {
-      // Ensure anomalies are not duplicated. Ignore and return.
-      // Example: daily data with hourly cron should generate only 1 sla alert if data is missing
-      return;
-    }
-    existingParentSLAAnomaly.setChild(true);
-    anomaly.setChild(false);
-    anomaly.setChildren(Collections.singleton(existingParentSLAAnomaly));
-    this.anomalyDAO.save(anomaly);
-    if (anomaly.getId() == null) {
-      LOG.warn("Could not store data sla check failed anomaly:\n{}", anomaly);
-    }
-  }
-
-  /**
-   * Creates a DATA_MISSING anomaly from ceiling(start) to ceiling(end) for the detection id.
-   * If existing DATA_MISSING anomalies are present, then it will be merged accordingly.
-   */
-  private void createDataSLAAnomaly(long start, long end, long detectionId, DatasetConfigDTO datasetConfig,
-      Map<String, Object> slaProps) {
-    MergedAnomalyResultDTO anomaly = DetectionUtils.makeAnomaly(
-        alignToUpperBoundary(start, datasetConfig),
-        alignToUpperBoundary(end, datasetConfig),
-        detectionId);
-    anomaly.setType(AnomalyType.DATA_MISSING);
-
-    // Store the metadata in the anomaly
-    Map<String, String> properties = new HashMap<>();
-    properties.put("datasetLastRefreshTime", String.valueOf(start - 1));
-    slaProps.forEach((k, v) -> {
-      properties.put(k, v.toString());
-    });
-    anomaly.setProperties(properties);
-
-    List<MergedAnomalyResultDTO> existingAnomalies = anomalyDAO.findAnomaliesWithinBoundary(start, end, detectionId);
-    if (!existingAnomalies.isEmpty()) {
-      mergeSLAAnomalies(anomaly, existingAnomalies);
-    } else {
-      // no merging required
-      this.anomalyDAO.save(anomaly);
-      if (anomaly.getId() == null) {
-        LOG.warn("Could not store data sla check failed anomaly:\n{}", anomaly);
-      }
-    }
-  }
-
-  /**
-   * We say one DATA_MISSING anomaly is a duplicate of another if they belong to the same detection with the same type
-   * and span the exact same duration.
-   */
-  private boolean isDuplicateSLAAnomaly(MergedAnomalyResultDTO anomaly1, MergedAnomalyResultDTO anomaly2) {
-    if (anomaly1 == null && anomaly2 == null) {
-      return true;
-    }
-
-    if (anomaly1 == null || anomaly2 == null) {
-      return false;
-    }
-
-    // anomalies belong to the same detection, same type & span over the same duration
-    return anomaly1.getDetectionConfigId().equals(anomaly2.getDetectionConfigId())
-        && anomaly1.getType() == anomaly2.getType()
-        && anomaly1.getStartTime() == anomaly2.getStartTime()
-        && anomaly1.getEndTime() == anomaly2.getEndTime();
-  }
-}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DetectionConfigValidator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DetectionConfigValidator.java
index fbefa5f..b3cee3e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DetectionConfigValidator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DetectionConfigValidator.java
@@ -177,6 +177,7 @@ public class DetectionConfigValidator implements ConfigValidator<DetectionConfig
       // Validate detection rules
       Preconditions.checkArgument(ruleYaml.containsKey(PROP_DETECTION),
           "Detection rule missing for sub-alert " + alertName + " rule no. " + ruleIndex);
+      // Validate detection rules
       List<Map<String, Object>> detectionRuleYamls = ConfigUtils.getList(ruleYaml.get(PROP_DETECTION));
       for (Map<String, Object> detectionRuleYaml : detectionRuleYamls) {
         validateRule(alertName, detectionRuleYaml, ruleIndex, "detection", names);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index 26bd38a..df5f593 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Collections2;
 import java.util.HashMap;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
 import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
 import org.apache.pinot.thirdeye.dataframe.Series;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
@@ -145,6 +146,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
         Collections2.filter(retrieved,
                 mergedAnomaly -> mergedAnomaly != null &&
                 !mergedAnomaly.isChild() &&
+                mergedAnomaly.getAnomalyResultSource().equals(AnomalyResultSource.DEFAULT_ANOMALY_DETECTION) &&
                     // merge if only the anomaly generated by the same detector
                 this.detectorComponentName.equals(mergedAnomaly.getProperties().getOrDefault(PROP_DETECTOR_COMPONENT_NAME, "")) &&
                     // merge if only the anomaly is in the same dimension
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 4455a63..d2a1320 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.detection.DataProvider;
@@ -59,8 +60,9 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
     Collection<MergedAnomalyResultDTO> anomalies =
         this.provider.fetchAnomalies(Collections.singleton(effectiveSlice)).get(effectiveSlice);
 
-    return anomalies.stream()
-        .filter(anomaly -> !anomaly.isChild() && ThirdEyeUtils.isDetectedByMultipleComponents(anomaly))
+    return anomalies.stream().filter(anomaly -> !anomaly.isChild()
+        && anomaly.getAnomalyResultSource().equals(AnomalyResultSource.DEFAULT_ANOMALY_DETECTION)
+        && ThirdEyeUtils.isDetectedByMultipleComponents(anomaly))
         .collect(Collectors.toList());
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/DataQualityMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/DataQualityMergeWrapper.java
new file mode 100644
index 0000000..23540e1
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/DataQualityMergeWrapper.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.wrapper;
+
+import com.google.common.collect.Collections2;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
+import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+
+
+/**
+ * The Data Quality Merge Wrapper. This merge wrapper is specifically designed keeping the sla anomalies in mind.
+ * We might need to revisit this when more data quality rules are added. Fundamentally, the data sla anomalies are never
+ * merged as we want to keep re-notifying users if the sla is missed after every detection. This merger will ensure no
+ * duplicate sla anomalies are created if the detection runs more frequently and will serve as a placeholder for future
+ * merging logic.
+ */
+public class DataQualityMergeWrapper extends MergeWrapper {
+  private static final String PROP_GROUP_KEY = "groupKey";
+
+  public DataQualityMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
+    super(provider, config, startTime, endTime);
+  }
+
+  @Override
+  protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    List<MergedAnomalyResultDTO> retrieved = super.retrieveAnomaliesFromDatabase(generated);
+
+    return new ArrayList<>(Collections2.filter(retrieved,
+        anomaly -> !anomaly.isChild() &&
+            anomaly.getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION)
+    ));
+  }
+
+  @Override
+  protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
+    List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
+    input.sort(MergeWrapper.COMPARATOR);
+
+    List<MergedAnomalyResultDTO> output = new ArrayList<>();
+    Map<AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>();
+    for (MergedAnomalyResultDTO anomaly : input) {
+      if (anomaly.isChild()) {
+        continue;
+      }
+
+      // Prevent merging of grouped anomalies
+      String groupKey = "";
+      if (anomaly.getProperties().containsKey(PROP_GROUP_KEY)) {
+        groupKey = anomaly.getProperties().get(PROP_GROUP_KEY);
+      }
+      AnomalyKey key = new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), groupKey,
+          "", anomaly.getType());
+
+      MergedAnomalyResultDTO parent = parents.get(key);
+      if (parent == null) {
+        parents.put(key, anomaly);
+        output.add(anomaly);
+      } else if (anomaly.getEndTime() <= parent.getEndTime()) {
+        // Ignore as an sla anomaly already exists - we do not want a duplicate.
+        // This can happen if the detection is setup to run more frequently than
+        // the dataset granularity
+      } else {
+        // save the anomaly
+        parents.put(key, anomaly);
+        output.add(anomaly);
+      }
+    }
+
+    return output;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigTranslator.java
index 1610818..7304b21 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigTranslator.java
@@ -20,45 +20,26 @@
 package org.apache.pinot.thirdeye.detection.yaml.translator;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections4.MapUtils;
-import org.apache.pinot.thirdeye.common.time.TimeGranularity;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.datasource.pinot.PinotThirdEyeDataSource;
 import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionUtils;
-import org.apache.pinot.thirdeye.detection.algorithm.DimensionWrapper;
-import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
 import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
-import org.apache.pinot.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.AnomalyFilterWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.BaselineFillingMergeWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.GrouperWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-
-import static org.apache.pinot.thirdeye.detection.ConfigUtils.*;
-import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
+import org.apache.pinot.thirdeye.detection.yaml.translator.builder.DetectionConfigPropertiesBuilder;
+import org.apache.pinot.thirdeye.detection.yaml.translator.builder.DataQualityPropertiesBuilder;
+import org.apache.pinot.thirdeye.detection.yaml.translator.builder.DetectionPropertiesBuilder;
 
 
 /**
  * The YAML translator of composite pipeline. Convert YAML file into detection pipeline properties.
  * This pipeline supports multiple detection algorithm running in OR relationship.
  * Supports running multiple filter rule running in AND relationship.
- * Conceptually, the pipeline has the following structure. (Grouper not implemented yet).
+ * Conceptually, the pipeline has the following structure.
  *
  *         +-----------+
  *         | Dimension |
@@ -95,7 +76,7 @@ import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
  *               |
  *               |
  *         +-----v-----+
- *         |  Grouper* |
+ *         |  Grouper  |
  *         |           |
  *         +-----------+
  *
@@ -128,53 +109,22 @@ import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
 public class DetectionConfigTranslator extends ConfigTranslator<DetectionConfigDTO, DetectionConfigValidator> {
   public static final String PROP_SUB_ENTITY_NAME = "subEntityName";
 
-  public static final String PROP_DIMENSION_EXPLORATION = "dimensionExploration";
-  public static final String PROP_FILTERS = "filters";
-
-  private static final String PROP_DETECTION = "detection";
-  private static final String PROP_CRON = "cron";
-  private static final String PROP_FILTER = "filter";
-  private static final String PROP_TYPE = "type";
-  private static final String PROP_CLASS_NAME = "className";
-  private static final String PROP_PARAMS = "params";
-  private static final String PROP_METRIC_URN = "metricUrn";
-  private static final String PROP_DIMENSION_FILTER_METRIC = "dimensionFilterMetric";
-  private static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
-  private static final String PROP_RULES = "rules";
-  private static final String PROP_GROUPER = "grouper";
-  private static final String PROP_NESTED = "nested";
-  private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
-  private static final String PROP_DETECTOR = "detector";
-  private static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
-  private static final String PROP_WINDOW_DELAY = "windowDelay";
-  private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
-  private static final String PROP_WINDOW_SIZE = "windowSize";
-  private static final String PROP_WINDOW_UNIT = "windowUnit";
-  private static final String PROP_FREQUENCY = "frequency";
-  private static final String PROP_MERGER = "merger";
-  private static final String PROP_TIMEZONE = "timezone";
-  private static final String PROP_NAME = "name";
-  private static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = "RULE_BASELINE";
-  private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
-  private static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
+  static final String PROP_CRON = "cron";
+  static final String PROP_TYPE = "type";
+  static final String PROP_NAME = "name";
 
   private static final String PROP_DETECTION_NAME = "detectionName";
   private static final String PROP_DESC_NAME = "description";
   private static final String PROP_ACTIVE = "active";
   private static final String PROP_OWNERS = "owners";
 
-  private static final String PROP_ALERTS = "alerts";
-  private static final String COMPOSITE_ALERT = "COMPOSITE_ALERT";
-  private static final String METRIC_ALERT = "METRIC_ALERT";
-
+  static final String COMPOSITE_ALERT = "COMPOSITE_ALERT";
+  static final String METRIC_ALERT = "METRIC_ALERT";
 
-  private static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
-  private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM");
-
-  private final Map<String, Object> components = new HashMap<>();
   private DataProvider dataProvider;
+  private DetectionConfigPropertiesBuilder detectionTranslatorBuilder;
+  private DetectionConfigPropertiesBuilder dataQualityTranslatorBuilder;
   private DetectionMetricAttributeHolder metricAttributesMap;
-  private Set<DatasetConfigDTO> datasetConfigs;
 
   public DetectionConfigTranslator(String yamlConfig, DataProvider provider) {
     this(yamlConfig, provider, new DetectionConfigValidator(provider));
@@ -184,99 +134,8 @@ public class DetectionConfigTranslator extends ConfigTranslator<DetectionConfigD
     super(yamlConfig, validator);
     this.dataProvider = provider;
     this.metricAttributesMap = new DetectionMetricAttributeHolder(provider);
-    this.datasetConfigs = new HashSet<>();
-  }
-
-  private Map<String, Object> translateMetricAlert(Map<String, Object> metricAlertConfigMap) {
-    String subEntityName = MapUtils.getString(metricAlertConfigMap, PROP_NAME);
-
-    DatasetConfigDTO datasetConfigDTO = metricAttributesMap.fetchDataset(metricAlertConfigMap);
-    datasetConfigs.add(datasetConfigDTO);
-    Map<String, Collection<String>> dimensionFiltersMap = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_FILTERS));
-    String metricUrn = MetricEntity.fromMetric(dimensionFiltersMap, metricAttributesMap.fetchMetric(metricAlertConfigMap).getId()).getUrn();
-    Map<String, Object> mergerProperties = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_MERGER));
-
-    // Translate all the rules
-    List<Map<String, Object>> ruleYamls = getList(metricAlertConfigMap.get(PROP_RULES));
-    List<Map<String, Object>> nestedPipelines = new ArrayList<>();
-    for (Map<String, Object> ruleYaml : ruleYamls) {
-      List<Map<String, Object>> filterYamls = ConfigUtils.getList(ruleYaml.get(PROP_FILTER));
-      List<Map<String, Object>> detectionYamls = ConfigUtils.getList(ruleYaml.get(PROP_DETECTION));
-      List<Map<String, Object>> detectionProperties = buildListOfMergeWrapperProperties(
-          subEntityName, metricUrn, detectionYamls, mergerProperties,
-          datasetConfigDTO.bucketTimeGranularity());
-      if (filterYamls.isEmpty()) {
-        nestedPipelines.addAll(detectionProperties);
-      } else {
-        List<Map<String, Object>> filterNestedProperties = detectionProperties;
-        for (Map<String, Object> filterProperties : filterYamls) {
-          filterNestedProperties = buildFilterWrapperProperties(metricUrn, AnomalyFilterWrapper.class.getName(), filterProperties,
-              filterNestedProperties);
-        }
-        nestedPipelines.addAll(filterNestedProperties);
-      }
-    }
-
-    // Wrap with dimension exploration properties
-    Map<String, Object> dimensionWrapperProperties = buildDimensionWrapperProperties(
-        metricAlertConfigMap, dimensionFiltersMap, metricUrn, datasetConfigDTO.getDataset());
-    Map<String, Object> properties = buildWrapperProperties(
-        ChildKeepingMergeWrapper.class.getName(),
-        Collections.singletonList(buildWrapperProperties(DimensionWrapper.class.getName(), nestedPipelines, dimensionWrapperProperties)),
-        mergerProperties);
-
-    // Wrap with metric level grouper, restricting to only 1 grouper
-    List<Map<String, Object>> grouperYamls = getList(metricAlertConfigMap.get(PROP_GROUPER));
-    if (!grouperYamls.isEmpty()) {
-      properties = buildWrapperProperties(
-          EntityAnomalyMergeWrapper.class.getName(),
-          Collections.singletonList(buildGroupWrapperProperties(subEntityName, metricUrn, grouperYamls.get(0), Collections.singletonList(properties))),
-          mergerProperties);
-
-      properties = buildWrapperProperties(
-          ChildKeepingMergeWrapper.class.getName(),
-          Collections.singletonList(properties),
-          mergerProperties);
-    }
-
-    return properties;
-  }
-
-  private Map<String, Object> translateCompositeAlert(Map<String, Object> compositeAlertConfigMap) {
-    Map<String, Object> properties;
-    String subEntityName = MapUtils.getString(compositeAlertConfigMap, PROP_NAME);
-
-    // Recursively translate all the sub-alerts
-    List<Map<String, Object>> subDetectionYamls = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_ALERTS));
-    List<Map<String, Object>> nestedPropertiesList = new ArrayList<>();
-    for (Map<String, Object> subDetectionYaml : subDetectionYamls) {
-      Map<String, Object> subProps;
-      if (subDetectionYaml.containsKey(PROP_TYPE) && subDetectionYaml.get(PROP_TYPE).equals(COMPOSITE_ALERT)) {
-        subProps = translateCompositeAlert(subDetectionYaml);
-      } else {
-        subProps = translateMetricAlert(subDetectionYaml);
-      }
-
-      nestedPropertiesList.add(subProps);
-    }
-
-    // Wrap the entity level grouper, only 1 grouper is supported now
-    List<Map<String, Object>> grouperProps = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_GROUPER));
-    Map<String, Object> mergerProperties = ConfigUtils.getMap(compositeAlertConfigMap.get(PROP_MERGER));
-    if (!grouperProps.isEmpty()) {
-      properties = buildWrapperProperties(
-          EntityAnomalyMergeWrapper.class.getName(),
-          Collections.singletonList(buildGroupWrapperProperties(subEntityName, grouperProps.get(0), nestedPropertiesList)),
-          mergerProperties);
-      nestedPropertiesList = Collections.singletonList(properties);;
-    }
-
-    properties = buildWrapperProperties(
-        ChildKeepingMergeWrapper.class.getName(),
-        nestedPropertiesList,
-        mergerProperties);
-
-    return properties;
+    this.detectionTranslatorBuilder = new DetectionPropertiesBuilder(metricAttributesMap, provider);
+    this.dataQualityTranslatorBuilder = new DataQualityPropertiesBuilder(metricAttributesMap, provider);
   }
 
   @Override
@@ -291,10 +150,12 @@ public class DetectionConfigTranslator extends ConfigTranslator<DetectionConfigD
     yamlConfigMap.putIfAbsent(PROP_TYPE, METRIC_ALERT);
 
     // Translate config depending on the type (METRIC_ALERT OR COMPOSITE_ALERT)
-    Map<String, Object> properties;
+    Map<String, Object> detectionProperties;
+    Map<String, Object> qualityProperties;
     String cron;
     if (yamlConfigMap.get(PROP_TYPE).equals(COMPOSITE_ALERT)) {
-      properties = translateCompositeAlert(yamlConfigMap);
+      detectionProperties = detectionTranslatorBuilder.buildCompositeAlertProperties(yamlConfigMap);
+      qualityProperties = dataQualityTranslatorBuilder.buildCompositeAlertProperties(yamlConfigMap);
 
       // TODO: discuss strategy for default cron
       Preconditions.checkArgument(yamlConfigMap.containsKey(PROP_CRON), "Missing property (" + PROP_CRON + ") in alert");
@@ -302,238 +163,35 @@ public class DetectionConfigTranslator extends ConfigTranslator<DetectionConfigD
     } else {
       // The legacy type 'COMPOSITE' will be treated as a metric alert along with the new convention METRIC_ALERT.
       // This is applicable only at the root level to maintain backward compatibility.
-      properties = translateMetricAlert(yamlConfigMap);
+      detectionProperties = detectionTranslatorBuilder.buildMetricAlertProperties(yamlConfigMap);
+      qualityProperties = dataQualityTranslatorBuilder.buildMetricAlertProperties(yamlConfigMap);
       cron = metricAttributesMap.fetchCron(yamlConfigMap);
     }
 
-    return generateDetectionConfig(yamlConfigMap, properties, this.components, cron);
-  }
-
-  private Map<String, Object> buildDimensionWrapperProperties(Map<String, Object> yamlConfigMap,
-      Map<String, Collection<String>> dimensionFilters, String metricUrn, String datasetName) {
-    Map<String, Object> dimensionWrapperProperties = new HashMap<>();
-    dimensionWrapperProperties.put(PROP_NESTED_METRIC_URNS, Collections.singletonList(metricUrn));
-    if (yamlConfigMap.containsKey(PROP_DIMENSION_EXPLORATION)) {
-      Map<String, Object> dimensionExploreYaml = ConfigUtils.getMap(yamlConfigMap.get(PROP_DIMENSION_EXPLORATION));
-      dimensionWrapperProperties.putAll(dimensionExploreYaml);
-      if (dimensionExploreYaml.containsKey(PROP_DIMENSION_FILTER_METRIC)){
-        MetricConfigDTO dimensionExploreMetric = this.dataProvider.fetchMetric(MapUtils.getString(dimensionExploreYaml, PROP_DIMENSION_FILTER_METRIC), datasetName);
-        dimensionWrapperProperties.put(PROP_METRIC_URN, MetricEntity.fromMetric(dimensionFilters, dimensionExploreMetric.getId()).getUrn());
-      } else {
-        dimensionWrapperProperties.put(PROP_METRIC_URN, metricUrn);
-      }
-    }
-    return dimensionWrapperProperties;
-  }
-
-  private List<Map<String, Object>> buildListOfMergeWrapperProperties(String subEntityName, String metricUrn,
-      List<Map<String, Object>> yamlConfigs, Map<String, Object> mergerProperties, TimeGranularity datasetTimegranularity) {
-    List<Map<String, Object>> properties = new ArrayList<>();
-    for (Map<String, Object> yamlConfig : yamlConfigs) {
-      properties.add(buildMergeWrapperProperties(subEntityName, metricUrn, yamlConfig, mergerProperties, datasetTimegranularity));
-    }
-    return properties;
-  }
-
-  private Map<String, Object> buildMergeWrapperProperties(String subEntityName, String metricUrn, Map<String, Object> yamlConfig,
-      Map<String, Object> mergerProperties, TimeGranularity datasetTimegranularity) {
-    String detectorType = MapUtils.getString(yamlConfig, PROP_TYPE);
-    String name = MapUtils.getString(yamlConfig, PROP_NAME);
-    Map<String, Object> nestedProperties = new HashMap<>();
-    nestedProperties.put(PROP_CLASS_NAME, AnomalyDetectorWrapper.class.getName());
-    nestedProperties.put(PROP_SUB_ENTITY_NAME, subEntityName);
-    String detectorRefKey = makeComponentRefKey(detectorType, name);
-
-    fillInDetectorWrapperProperties(nestedProperties, yamlConfig, detectorType, datasetTimegranularity);
-
-    buildComponentSpec(metricUrn, yamlConfig, detectorType, detectorRefKey);
-
-    Map<String, Object> properties = new HashMap<>();
-    properties.put(PROP_CLASS_NAME, BaselineFillingMergeWrapper.class.getName());
-    properties.put(PROP_NESTED, Collections.singletonList(nestedProperties));
-    properties.put(PROP_DETECTOR, detectorRefKey);
-
-    // fill in baseline provider properties
-    if (DETECTION_REGISTRY.isBaselineProvider(detectorType)) {
-      // if the detector implements the baseline provider interface, use it to generate baseline
-      properties.put(PROP_BASELINE_PROVIDER, detectorRefKey);
-    } else {
-      String baselineProviderType = DEFAULT_BASELINE_PROVIDER_YAML_TYPE;
-      String baselineProviderKey = makeComponentRefKey(baselineProviderType, name);
-      buildComponentSpec(metricUrn, yamlConfig, baselineProviderType, baselineProviderKey);
-      properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
-    }
-    properties.putAll(mergerProperties);
-    return properties;
-  }
-
-  private Map<String, Object> buildGroupWrapperProperties(String entityName, Map<String, Object> grouperYaml, List<Map<String, Object>> nestedProps) {
-    return buildGroupWrapperProperties(entityName, null, grouperYaml, nestedProps);
-  }
-
-  private Map<String, Object> buildGroupWrapperProperties(String entityName, String metricUrn,
-      Map<String, Object> grouperYaml, List<Map<String, Object>> nestedProps) {
-    Map<String, Object> properties = new HashMap<>();
-    properties.put(PROP_CLASS_NAME, GrouperWrapper.class.getName());
-    properties.put(PROP_NESTED, nestedProps);
-    properties.put(PROP_SUB_ENTITY_NAME, entityName);
-
-    String grouperType = MapUtils.getString(grouperYaml, PROP_TYPE);
-    String grouperName = MapUtils.getString(grouperYaml, PROP_NAME);
-    String grouperRefKey = makeComponentRefKey(grouperType, grouperName);
-    properties.put(PROP_GROUPER, grouperRefKey);
-
-    buildComponentSpec(metricUrn, grouperYaml, grouperType, grouperRefKey);
-
-    return properties;
-  }
-
-  // fill in window size and unit if detector requires this
-  private void fillInDetectorWrapperProperties(Map<String, Object> properties, Map<String, Object> yamlConfig, String detectorType, TimeGranularity datasetTimegranularity) {
-    // set default bucketPeriod
-    properties.put(PROP_BUCKET_PERIOD, datasetTimegranularity.toPeriod().toString());
-
-    // override bucketPeriod now since it is needed by detection window
-    if (yamlConfig.containsKey(PROP_BUCKET_PERIOD)){
-      properties.put(PROP_BUCKET_PERIOD, MapUtils.getString(yamlConfig, PROP_BUCKET_PERIOD));
-    }
-
-    // set default detection window
-    setDefaultDetectionWindow(properties, detectorType);
-
-    // override other properties from yaml
-    if (yamlConfig.containsKey(PROP_WINDOW_SIZE)) {
-      properties.put(PROP_MOVING_WINDOW_DETECTION, true);
-      properties.put(PROP_WINDOW_SIZE, MapUtils.getString(yamlConfig, PROP_WINDOW_SIZE));
-    }
-    if (yamlConfig.containsKey(PROP_WINDOW_UNIT)) {
-      properties.put(PROP_MOVING_WINDOW_DETECTION, true);
-      properties.put(PROP_WINDOW_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_UNIT));
-    }
-    if (yamlConfig.containsKey(PROP_WINDOW_DELAY)) {
-      properties.put(PROP_WINDOW_DELAY, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY));
-    }
-    if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
-      properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY_UNIT));
-    }
-    if (yamlConfig.containsKey(PROP_TIMEZONE)){
-      properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, PROP_TIMEZONE));
-    }
-    if (yamlConfig.containsKey(PROP_CACHE_PERIOD_LOOKBACK)) {
-      properties.put(PROP_CACHE_PERIOD_LOOKBACK, MapUtils.getString(yamlConfig, PROP_CACHE_PERIOD_LOOKBACK));
-    }
-  }
-
-  // Set the default detection window if it is not specified.
-  // Here instead of using data granularity we use the detection period to set the default window size.
-  private void setDefaultDetectionWindow(Map<String, Object> properties, String detectorType) {
-    if (MOVING_WINDOW_DETECTOR_TYPES.contains(detectorType)) {
-      properties.put(PROP_MOVING_WINDOW_DETECTION, true);
-      org.joda.time.Period detectionPeriod =
-          org.joda.time.Period.parse(MapUtils.getString(properties, PROP_BUCKET_PERIOD));
-      int days = detectionPeriod.toStandardDays().getDays();
-      int hours = detectionPeriod.toStandardHours().getHours();
-      int minutes = detectionPeriod.toStandardMinutes().getMinutes();
-      if (days >= 1) {
-        properties.put(PROP_WINDOW_SIZE, 1);
-        properties.put(PROP_WINDOW_UNIT, TimeUnit.DAYS);
-      } else if (hours >= 1) {
-        properties.put(PROP_WINDOW_SIZE, 24);
-        properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
-      } else if (minutes >= 1) {
-        properties.put(PROP_WINDOW_SIZE, 6);
-        properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
-        properties.put(PROP_FREQUENCY, new TimeGranularity(15, TimeUnit.MINUTES));
-      } else {
-        properties.put(PROP_WINDOW_SIZE, 6);
-        properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
-      }
-    }
-  }
-
-  private List<Map<String, Object>> buildFilterWrapperProperties(String metricUrn, String wrapperClassName,
-      Map<String, Object> yamlConfig, List<Map<String, Object>> nestedProperties) {
-    if (yamlConfig == null || yamlConfig.isEmpty()) {
-      return nestedProperties;
-    }
-    Map<String, Object> wrapperProperties = buildWrapperProperties(wrapperClassName, nestedProperties);
-    if (wrapperProperties.isEmpty()) {
-      return Collections.emptyList();
-    }
-    String name = MapUtils.getString(yamlConfig, PROP_NAME);
-    String filterType = MapUtils.getString(yamlConfig, PROP_TYPE);
-    String filterRefKey = makeComponentRefKey(filterType, name);
-    wrapperProperties.put(PROP_FILTER, filterRefKey);
-    buildComponentSpec(metricUrn, yamlConfig, filterType, filterRefKey);
-
-    return Collections.singletonList(wrapperProperties);
-  }
-
-  private Map<String, Object> buildWrapperProperties(String wrapperClassName,
-      List<Map<String, Object>> nestedProperties) {
-    return buildWrapperProperties(wrapperClassName, nestedProperties, Collections.emptyMap());
-  }
-
-  private Map<String, Object> buildWrapperProperties(String wrapperClassName,
-      List<Map<String, Object>> nestedProperties, Map<String, Object> defaultProperties) {
-    Map<String, Object> properties = new HashMap<>();
-    List<Map<String, Object>> wrapperNestedProperties = new ArrayList<>();
-    for (Map<String, Object> nested : nestedProperties) {
-      if (nested != null && !nested.isEmpty()) {
-        wrapperNestedProperties.add(nested);
-      }
-    }
-    if (wrapperNestedProperties.isEmpty()) {
-      return properties;
-    }
-    properties.put(PROP_CLASS_NAME, wrapperClassName);
-    properties.put(PROP_NESTED, wrapperNestedProperties);
-    properties.putAll(defaultProperties);
-    return properties;
-  }
-
-  private void buildComponentSpec(String metricUrn, Map<String, Object> yamlConfig, String type, String componentRefKey) {
-    Map<String, Object> componentSpecs = new HashMap<>();
-
-    String componentClassName = DETECTION_REGISTRY.lookup(type);
-    componentSpecs.put(PROP_CLASS_NAME, componentClassName);
-    componentSpecs.put(PROP_METRIC_URN, metricUrn);
-
-    Map<String, Object> params = ConfigUtils.getMap(yamlConfig.get(PROP_PARAMS));
-
-    // For tunable components, the model params are computed from user supplied yaml params and previous model params.
-    // We store the yaml params under a separate key, PROP_YAML_PARAMS, to distinguish from model params.
-    if (DETECTION_REGISTRY.isTunable(componentClassName)) {
-      componentSpecs.put(PROP_YAML_PARAMS, params);
-    } else {
-      componentSpecs.putAll(params);
-    }
-
-    this.components.put(DetectionUtils.getComponentKey(componentRefKey), componentSpecs);
-  }
-
-  private String makeComponentRefKey(String type, String name) {
-    return "$" + name + ":" + type;
+    return generateDetectionConfig(yamlConfigMap, detectionProperties, qualityProperties, cron);
   }
 
   /**
    * Fill in common fields of detection config. Properties of the pipeline is filled by the subclass.
    */
-  private DetectionConfigDTO generateDetectionConfig(Map<String, Object> yamlConfigMap, Map<String, Object> properties,
-      Map<String, Object> components, String cron) {
+  private DetectionConfigDTO generateDetectionConfig(Map<String, Object> yamlConfigMap,
+      Map<String, Object> detectionProperties, Map<String, Object> qualityProperties, String cron) {
     DetectionConfigDTO config = new DetectionConfigDTO();
     config.setName(MapUtils.getString(yamlConfigMap, PROP_DETECTION_NAME));
     config.setDescription(MapUtils.getString(yamlConfigMap, PROP_DESC_NAME));
+    config.setDescription(MapUtils.getString(yamlConfigMap, PROP_DESC_NAME));
     config.setLastTimestamp(System.currentTimeMillis());
     config.setOwners(filterOwners(ConfigUtils.getList(yamlConfigMap.get(PROP_OWNERS))));
 
-    config.setProperties(properties);
-    config.setComponentSpecs(components);
+    config.setProperties(detectionProperties);
+    config.setDataQualityProperties(qualityProperties);
+    config.setComponentSpecs(this.metricAttributesMap.getAllComponenets());
     config.setCron(cron);
     config.setActive(MapUtils.getBooleanValue(yamlConfigMap, PROP_ACTIVE, true));
     config.setYaml(yamlConfig);
 
     //TODO: data-availability trigger is only enabled for detections running on PINOT daily dataset only
+    List<DatasetConfigDTO> datasetConfigs = this.metricAttributesMap.getAllDatasets();
     if (MapUtils.getString(yamlConfigMap, PROP_CRON) == null
         && datasetConfigs.stream().allMatch(c -> c.bucketTimeGranularity().getUnit().equals(TimeUnit.DAYS))
         && datasetConfigs.stream().allMatch(c -> c.getDataSource().equals(PinotThirdEyeDataSource.DATA_SOURCE_NAME))) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionMetricAttributeHolder.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionMetricAttributeHolder.java
index 8d290e8..768e6d0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionMetricAttributeHolder.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionMetricAttributeHolder.java
@@ -19,17 +19,14 @@
 
 package org.apache.pinot.thirdeye.detection.yaml.translator;
 
-import com.google.common.base.Preconditions;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.pinot.thirdeye.common.time.TimeGranularity;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.pojo.DatasetConfigBean;
 import org.apache.pinot.thirdeye.detection.DataProvider;
 import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
 
@@ -39,7 +36,7 @@ import static org.apache.pinot.thirdeye.detection.ConfigUtils.*;
 /**
  * A data holder to store the processed information per metric
  */
-class DetectionMetricAttributeHolder {
+public class DetectionMetricAttributeHolder {
 
   private static final String PROP_METRIC = "metric";
   private static final String PROP_DATASET = "dataset";
@@ -47,6 +44,8 @@ class DetectionMetricAttributeHolder {
 
   private final Map<String, DetectionMetricProperties> metricAttributesMap = new HashMap<>();
   private final DataProvider dataProvider;
+  private final List<DatasetConfigDTO> datasetConfigs = new ArrayList<>();
+  private final Map<String, Object> components = new HashMap<>();
 
   DetectionMetricAttributeHolder(DataProvider provider) {
     this.dataProvider = provider;
@@ -62,6 +61,7 @@ class DetectionMetricAttributeHolder {
     }
 
     DatasetConfigDTO datasetConfig = fetchDatasetConfigDTO(this.dataProvider, datasetName);
+    datasetConfigs.add(datasetConfig);
 
     MetricConfigDTO metricConfig = this.dataProvider.fetchMetric(metricName, datasetConfig.getDataset());
 
@@ -72,15 +72,19 @@ class DetectionMetricAttributeHolder {
     return metricAliasKey;
   }
 
-  DatasetConfigDTO fetchDataset(Map<String, Object> metricAlertConfigMap) {
+  public List<DatasetConfigDTO> getAllDatasets() {
+    return datasetConfigs;
+  }
+
+  public DatasetConfigDTO fetchDataset(Map<String, Object> metricAlertConfigMap) {
     return metricAttributesMap.get(loadMetricCache(metricAlertConfigMap)).getDatasetConfigDTO();
   }
 
-  MetricConfigDTO fetchMetric(Map<String, Object> metricAlertConfigMap) {
+  public MetricConfigDTO fetchMetric(Map<String, Object> metricAlertConfigMap) {
     return metricAttributesMap.get(loadMetricCache(metricAlertConfigMap)).getMetricConfigDTO();
   }
 
-  String fetchCron(Map<String, Object> metricAlertConfigMap) {
+  public String fetchCron(Map<String, Object> metricAlertConfigMap) {
     return metricAttributesMap.get(loadMetricCache(metricAlertConfigMap)).getCron();
   }
 
@@ -101,4 +105,13 @@ class DetectionMetricAttributeHolder {
         return "0 0 0 * * ?";
     }
   }
+
+  public void addComponent(String componentKey, Map<String, Object> componentSpecs) {
+    components.put(componentKey, componentSpecs);
+  }
+
+  public Map<String, Object> getAllComponenets() {
+    return components;
+  }
+
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DataQualityPropertiesBuilder.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DataQualityPropertiesBuilder.java
new file mode 100644
index 0000000..897a1f7
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DataQualityPropertiesBuilder.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.yaml.translator.builder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionMetricAttributeHolder;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+import static org.apache.pinot.thirdeye.detection.ConfigUtils.*;
+
+
+/**
+ * This class is responsible for translating the data quality properties
+ */
+public class DataQualityPropertiesBuilder extends DetectionConfigPropertiesBuilder {
+
+  static final String PROP_QUALITY_CHECK = "qualityCheck";
+
+  public DataQualityPropertiesBuilder(DetectionMetricAttributeHolder metricAttributesMap, DataProvider provider) {
+    super(metricAttributesMap, provider);
+  }
+
+  /**
+   * Constructs the data quality properties mapping by translating the quality yaml
+   *
+   * @param metricAlertConfigMap holds the parsed yaml for a single metric alert
+   */
+  @Override
+  public Map<String, Object> buildMetricAlertProperties(Map<String, Object> metricAlertConfigMap) {
+    Map<String, Object> properties = new HashMap<>();
+    MetricConfigDTO metricConfigDTO = metricAttributesMap.fetchMetric(metricAlertConfigMap);
+
+    String subEntityName = MapUtils.getString(metricAlertConfigMap, PROP_NAME);
+    Map<String, Object> mergerProperties = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_MERGER));
+
+    Map<String, Collection<String>> dimensionFiltersMap = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_FILTERS));
+    String metricUrn = MetricEntity.fromMetric(dimensionFiltersMap, metricConfigDTO.getId()).getUrn();
+
+    // Translate all the rules
+    List<Map<String, Object>> ruleYamls = getList(metricAlertConfigMap.get(PROP_RULES));
+    List<Map<String, Object>> nestedPipelines = new ArrayList<>();
+    for (Map<String, Object> ruleYaml : ruleYamls) {
+      List<Map<String, Object>> qualityYamls = ConfigUtils.getList(ruleYaml.get(PROP_QUALITY));
+      if (qualityYamls.isEmpty()) {
+        continue;
+      }
+      List<Map<String, Object>> qualityProperties = buildListOfDataQualityProperties(
+          subEntityName, metricUrn, qualityYamls, mergerProperties);
+      nestedPipelines.addAll(qualityProperties);
+    }
+    if (nestedPipelines.isEmpty()) {
+      // No data quality rules
+      return properties;
+    }
+
+    properties.putAll(buildWrapperProperties(DataQualityMergeWrapper.class.getName(), nestedPipelines, mergerProperties));
+    return properties;
+  }
+
+  @Override
+  public Map<String, Object> buildCompositeAlertProperties(Map<String, Object> compositeAlertConfigMap) {
+    Map<String, Object> properties = new HashMap<>();
+
+    // Recursively translate all the sub-alerts
+    List<Map<String, Object>> subDetectionYamls = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_ALERTS));
+    List<Map<String, Object>> nestedPropertiesList = new ArrayList<>();
+    for (Map<String, Object> subDetectionYaml : subDetectionYamls) {
+      Map<String, Object> subProps;
+      if (subDetectionYaml.containsKey(PROP_TYPE) && subDetectionYaml.get(PROP_TYPE).equals(COMPOSITE_ALERT)) {
+        subProps = buildCompositeAlertProperties(subDetectionYaml);
+      } else {
+        subProps = buildMetricAlertProperties(subDetectionYaml);
+      }
+
+      nestedPropertiesList.add(subProps);
+    }
+    if (nestedPropertiesList.isEmpty()) {
+      return properties;
+    }
+
+    properties.putAll(compositePropertyBuilderHelper(nestedPropertiesList, compositeAlertConfigMap));
+    return properties;
+  }
+
+  private List<Map<String, Object>> buildListOfDataQualityProperties(String subEntityName, String metricUrn,
+      List<Map<String, Object>> yamlConfigs, Map<String, Object> mergerProperties) {
+    List<Map<String, Object>> properties = new ArrayList<>();
+    for (Map<String, Object> yamlConfig : yamlConfigs) {
+      properties.add(buildDataQualityWrapperProperties(subEntityName, metricUrn, yamlConfig, mergerProperties));
+    }
+    return properties;
+  }
+
+  private Map<String, Object> buildDataQualityWrapperProperties(String subEntityName, String metricUrn,
+      Map<String, Object> yamlConfig, Map<String, Object> mergerProperties) {
+    String qualityType = MapUtils.getString(yamlConfig, PROP_TYPE);
+    String name = MapUtils.getString(yamlConfig, PROP_NAME);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(PROP_CLASS_NAME, DataSlaWrapper.class.getName());
+    properties.put(PROP_SUB_ENTITY_NAME, subEntityName);
+    properties.put(PROP_METRIC_URN, metricUrn);
+
+    String qualityRefKey = makeComponentRefKey(qualityType, name);
+    properties.put(PROP_QUALITY_CHECK, qualityRefKey);
+
+    buildComponentSpec(metricUrn, yamlConfig, qualityType, qualityRefKey);
+
+    properties.putAll(mergerProperties);
+    return properties;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionConfigPropertiesBuilder.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionConfigPropertiesBuilder.java
new file mode 100644
index 0000000..ee7d626
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionConfigPropertiesBuilder.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.yaml.translator.builder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.GrouperWrapper;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionMetricAttributeHolder;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
+
+
+/**
+ * This is the root of the detection config builder. Other translators extend from this class.
+ */
+public abstract class DetectionConfigPropertiesBuilder {
+
+  public static final String PROP_SUB_ENTITY_NAME = "subEntityName";
+  public static final String PROP_DIMENSION_EXPLORATION = "dimensionExploration";
+  public static final String PROP_FILTERS = "filters";
+
+  static final String PROP_DETECTION = "detection";
+  static final String PROP_FILTER = "filter";
+  static final String PROP_QUALITY = "quality";
+  static final String PROP_TYPE = "type";
+  static final String PROP_CLASS_NAME = "className";
+  static final String PROP_PARAMS = "params";
+  static final String PROP_METRIC_URN = "metricUrn";
+  static final String PROP_DIMENSION_FILTER_METRIC = "dimensionFilterMetric";
+  static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
+  static final String PROP_RULES = "rules";
+  static final String PROP_GROUPER = "grouper";
+  static final String PROP_NESTED = "nested";
+  static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
+  static final String PROP_DETECTOR = "detector";
+  static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
+  static final String PROP_WINDOW_DELAY = "windowDelay";
+  static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+  static final String PROP_WINDOW_SIZE = "windowSize";
+  static final String PROP_WINDOW_UNIT = "windowUnit";
+  static final String PROP_FREQUENCY = "frequency";
+  static final String PROP_MERGER = "merger";
+  static final String PROP_TIMEZONE = "timezone";
+  static final String PROP_NAME = "name";
+
+  static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = "RULE_BASELINE";
+  static final String PROP_BUCKET_PERIOD = "bucketPeriod";
+  static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
+
+  static final String PROP_ALERTS = "alerts";
+  static final String COMPOSITE_ALERT = "COMPOSITE_ALERT";
+
+  final DetectionMetricAttributeHolder metricAttributesMap;
+  final DataProvider dataProvider;
+  static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
+
+  DetectionConfigPropertiesBuilder(DetectionMetricAttributeHolder metricAttributesMap, DataProvider dataProvider) {
+    this.metricAttributesMap = metricAttributesMap;
+    this.dataProvider = dataProvider;
+  }
+
+  public abstract Map<String, Object> buildMetricAlertProperties(Map<String, Object> yamlConfigMap) throws IllegalArgumentException;
+
+  public abstract Map<String, Object> buildCompositeAlertProperties(Map<String, Object> yamlConfigMap) throws IllegalArgumentException;
+
+  Map<String, Object> buildDimensionWrapperProperties(Map<String, Object> yamlConfigMap,
+      Map<String, Collection<String>> dimensionFilters, String metricUrn, String datasetName) {
+    Map<String, Object> dimensionWrapperProperties = new HashMap<>();
+    dimensionWrapperProperties.put(PROP_NESTED_METRIC_URNS, Collections.singletonList(metricUrn));
+    if (yamlConfigMap.containsKey(PROP_DIMENSION_EXPLORATION)) {
+      Map<String, Object> dimensionExploreYaml = ConfigUtils.getMap(yamlConfigMap.get(PROP_DIMENSION_EXPLORATION));
+      dimensionWrapperProperties.putAll(dimensionExploreYaml);
+      if (dimensionExploreYaml.containsKey(PROP_DIMENSION_FILTER_METRIC)){
+        MetricConfigDTO dimensionExploreMetric = this.dataProvider.fetchMetric(MapUtils.getString(dimensionExploreYaml, PROP_DIMENSION_FILTER_METRIC), datasetName);
+        dimensionWrapperProperties.put(PROP_METRIC_URN, MetricEntity.fromMetric(dimensionFilters, dimensionExploreMetric.getId()).getUrn());
+      } else {
+        dimensionWrapperProperties.put(PROP_METRIC_URN, metricUrn);
+      }
+    }
+    return dimensionWrapperProperties;
+  }
+
+  Map<String, Object> buildGroupWrapperProperties(String entityName, Map<String, Object> grouperYaml, List<Map<String, Object>> nestedProps) {
+    return buildGroupWrapperProperties(entityName, null, grouperYaml, nestedProps);
+  }
+
+  Map<String, Object> buildGroupWrapperProperties(String entityName, String metricUrn,
+      Map<String, Object> grouperYaml, List<Map<String, Object>> nestedProps) {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(PROP_CLASS_NAME, GrouperWrapper.class.getName());
+    properties.put(PROP_NESTED, nestedProps);
+    properties.put(PROP_SUB_ENTITY_NAME, entityName);
+
+    String grouperType = MapUtils.getString(grouperYaml, PROP_TYPE);
+    String grouperName = MapUtils.getString(grouperYaml, PROP_NAME);
+    String grouperRefKey = makeComponentRefKey(grouperType, grouperName);
+    properties.put(PROP_GROUPER, grouperRefKey);
+
+    buildComponentSpec(metricUrn, grouperYaml, grouperType, grouperRefKey);
+
+    return properties;
+  }
+
+  List<Map<String, Object>> buildFilterWrapperProperties(String metricUrn, String wrapperClassName,
+      Map<String, Object> yamlConfig, List<Map<String, Object>> nestedProperties) {
+    if (yamlConfig == null || yamlConfig.isEmpty()) {
+      return nestedProperties;
+    }
+    Map<String, Object> wrapperProperties = buildWrapperProperties(wrapperClassName, nestedProperties);
+    if (wrapperProperties.isEmpty()) {
+      return Collections.emptyList();
+    }
+    String name = MapUtils.getString(yamlConfig, PROP_NAME);
+    String filterType = MapUtils.getString(yamlConfig, PROP_TYPE);
+    String filterRefKey = makeComponentRefKey(filterType, name);
+    wrapperProperties.put(PROP_FILTER, filterRefKey);
+    buildComponentSpec(metricUrn, yamlConfig, filterType, filterRefKey);
+
+    return Collections.singletonList(wrapperProperties);
+  }
+
+  void buildComponentSpec(String metricUrn, Map<String, Object> yamlConfig, String type, String componentRefKey) {
+    Map<String, Object> componentSpecs = new HashMap<>();
+
+    String componentClassName = DETECTION_REGISTRY.lookup(type);
+    componentSpecs.put(PROP_CLASS_NAME, componentClassName);
+    if (metricUrn != null) {
+      componentSpecs.put(PROP_METRIC_URN, metricUrn);
+    }
+
+    Map<String, Object> params = ConfigUtils.getMap(yamlConfig.get(PROP_PARAMS));
+
+    // For tunable components, the model params are computed from user supplied yaml params and previous model params.
+    // We store the yaml params under a separate key, PROP_YAML_PARAMS, to distinguish from model params.
+    if (DETECTION_REGISTRY.isTunable(componentClassName)) {
+      componentSpecs.put(PROP_YAML_PARAMS, params);
+    } else {
+      componentSpecs.putAll(params);
+    }
+
+    metricAttributesMap.addComponent(DetectionUtils.getComponentKey(componentRefKey), componentSpecs);
+  }
+
+  Map<String, Object> compositePropertyBuilderHelper(List<Map<String, Object>> nestedPropertiesList,
+      Map<String, Object> compositeAlertConfigMap) {
+    Map<String, Object> properties;
+    String subEntityName = MapUtils.getString(compositeAlertConfigMap, PROP_NAME);
+
+    // Wrap the entity level grouper, only 1 grouper is supported now
+    List<Map<String, Object>> grouperProps = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_GROUPER));
+    Map<String, Object> mergerProperties = ConfigUtils.getMap(compositeAlertConfigMap.get(PROP_MERGER));
+    if (!grouperProps.isEmpty()) {
+      properties = buildWrapperProperties(
+          EntityAnomalyMergeWrapper.class.getName(),
+          Collections.singletonList(buildGroupWrapperProperties(subEntityName, grouperProps.get(0), nestedPropertiesList)),
+          mergerProperties);
+      nestedPropertiesList = Collections.singletonList(properties);
+    }
+
+    return buildWrapperProperties(
+        ChildKeepingMergeWrapper.class.getName(),
+        nestedPropertiesList,
+        mergerProperties);
+  }
+
+  Map<String, Object> buildWrapperProperties(String wrapperClassName,
+      List<Map<String, Object>> nestedProperties) {
+    return buildWrapperProperties(wrapperClassName, nestedProperties, Collections.emptyMap());
+  }
+
+  Map<String, Object> buildWrapperProperties(String wrapperClassName,
+      List<Map<String, Object>> nestedProperties, Map<String, Object> defaultProperties) {
+    Map<String, Object> properties = new HashMap<>();
+    List<Map<String, Object>> wrapperNestedProperties = new ArrayList<>();
+    for (Map<String, Object> nested : nestedProperties) {
+      if (nested != null && !nested.isEmpty()) {
+        wrapperNestedProperties.add(nested);
+      }
+    }
+    if (wrapperNestedProperties.isEmpty()) {
+      return properties;
+    }
+    properties.put(PROP_CLASS_NAME, wrapperClassName);
+    properties.put(PROP_NESTED, wrapperNestedProperties);
+    properties.putAll(defaultProperties);
+    return properties;
+  }
+
+  static String makeComponentRefKey(String type, String name) {
+    return "$" + name + ":" + type;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionPropertiesBuilder.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionPropertiesBuilder.java
new file mode 100644
index 0000000..664e40c
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionPropertiesBuilder.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.yaml.translator.builder;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.algorithm.DimensionWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.AnomalyFilterWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.BaselineFillingMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionMetricAttributeHolder;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+import static org.apache.pinot.thirdeye.detection.ConfigUtils.*;
+
+
+/**
+ * This class is responsible for translating the detection properties
+ */
+public class DetectionPropertiesBuilder extends DetectionConfigPropertiesBuilder {
+
+  private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM");
+
+  public DetectionPropertiesBuilder(DetectionMetricAttributeHolder metricAttributesMap, DataProvider provider) {
+    super(metricAttributesMap, provider);
+  }
+
+  /**
+   * Constructs the detection properties mapping by translating the detection & filter yaml
+   *
+   * @param metricAlertConfigMap holds the parsed yaml for a single metric alert
+   */
+  @Override
+  public Map<String, Object> buildMetricAlertProperties(Map<String, Object> metricAlertConfigMap) {
+    MetricConfigDTO metricConfigDTO = metricAttributesMap.fetchMetric(metricAlertConfigMap);
+    DatasetConfigDTO datasetConfigDTO = metricAttributesMap.fetchDataset(metricAlertConfigMap);
+
+    String subEntityName = MapUtils.getString(metricAlertConfigMap, PROP_NAME);
+    Map<String, Object> mergerProperties = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_MERGER));
+    Map<String, Collection<String>> dimensionFiltersMap = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_FILTERS));
+    String metricUrn = MetricEntity.fromMetric(dimensionFiltersMap, metricConfigDTO.getId()).getUrn();
+
+    // Translate all the rules
+    List<Map<String, Object>> ruleYamls = getList(metricAlertConfigMap.get(PROP_RULES));
+    List<Map<String, Object>> nestedPipelines = new ArrayList<>();
+    for (Map<String, Object> ruleYaml : ruleYamls) {
+      List<Map<String, Object>> detectionYamls = ConfigUtils.getList(ruleYaml.get(PROP_DETECTION));
+      List<Map<String, Object>> detectionProperties = buildListOfMergeWrapperProperties(
+          subEntityName, metricUrn, detectionYamls, mergerProperties, datasetConfigDTO.bucketTimeGranularity());
+
+      List<Map<String, Object>> filterYamls = ConfigUtils.getList(ruleYaml.get(PROP_FILTER));
+      if (filterYamls.isEmpty()) {
+        nestedPipelines.addAll(detectionProperties);
+      } else {
+        List<Map<String, Object>> filterNestedProperties = detectionProperties;
+        for (Map<String, Object> filterProperties : filterYamls) {
+          filterNestedProperties = buildFilterWrapperProperties(metricUrn, AnomalyFilterWrapper.class.getName(), filterProperties,
+              filterNestedProperties);
+        }
+        nestedPipelines.addAll(filterNestedProperties);
+      }
+    }
+
+    // Wrap with dimension exploration properties
+    Map<String, Object> dimensionWrapperProperties = buildDimensionWrapperProperties(
+        metricAlertConfigMap, dimensionFiltersMap, metricUrn, datasetConfigDTO.getDataset());
+    Map<String, Object> properties = buildWrapperProperties(
+        ChildKeepingMergeWrapper.class.getName(),
+        Collections.singletonList(buildWrapperProperties(DimensionWrapper.class.getName(), nestedPipelines, dimensionWrapperProperties)),
+        mergerProperties);
+
+    // Wrap with metric level grouper, restricting to only 1 grouper
+    List<Map<String, Object>> grouperYamls = getList(metricAlertConfigMap.get(PROP_GROUPER));
+    if (!grouperYamls.isEmpty()) {
+      properties = buildWrapperProperties(
+          EntityAnomalyMergeWrapper.class.getName(),
+          Collections.singletonList(buildGroupWrapperProperties(subEntityName, metricUrn, grouperYamls.get(0), Collections.singletonList(properties))),
+          mergerProperties);
+
+      properties = buildWrapperProperties(
+          ChildKeepingMergeWrapper.class.getName(),
+          Collections.singletonList(properties),
+          mergerProperties);
+    }
+
+    return properties;
+  }
+
+  @Override
+  public Map<String, Object> buildCompositeAlertProperties(Map<String, Object> compositeAlertConfigMap) {
+    // Recursively translate all the sub-alerts
+    List<Map<String, Object>> subDetectionYamls = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_ALERTS));
+    List<Map<String, Object>> nestedPropertiesList = new ArrayList<>();
+    for (Map<String, Object> subDetectionYaml : subDetectionYamls) {
+      Map<String, Object> subProps;
+      if (subDetectionYaml.containsKey(PROP_TYPE) && subDetectionYaml.get(PROP_TYPE).equals(COMPOSITE_ALERT)) {
+        subProps = buildCompositeAlertProperties(subDetectionYaml);
+      } else {
+        subProps = buildMetricAlertProperties(subDetectionYaml);
+      }
+
+      nestedPropertiesList.add(subProps);
+    }
+
+    return compositePropertyBuilderHelper(nestedPropertiesList, compositeAlertConfigMap);
+  }
+
+  private List<Map<String, Object>> buildListOfMergeWrapperProperties(String subEntityName, String metricUrn,
+      List<Map<String, Object>> yamlConfigs, Map<String, Object> mergerProperties, TimeGranularity datasetTimegranularity) {
+    List<Map<String, Object>> properties = new ArrayList<>();
+    for (Map<String, Object> yamlConfig : yamlConfigs) {
+      properties.add(buildMergeWrapperProperties(subEntityName, metricUrn, yamlConfig, mergerProperties, datasetTimegranularity));
+    }
+    return properties;
+  }
+
+  private Map<String, Object> buildMergeWrapperProperties(String subEntityName, String metricUrn, Map<String, Object> yamlConfig,
+      Map<String, Object> mergerProperties, TimeGranularity datasetTimegranularity) {
+    String detectorType = MapUtils.getString(yamlConfig, PROP_TYPE);
+    String name = MapUtils.getString(yamlConfig, PROP_NAME);
+    Map<String, Object> nestedProperties = new HashMap<>();
+    nestedProperties.put(PROP_CLASS_NAME, AnomalyDetectorWrapper.class.getName());
+    nestedProperties.put(PROP_SUB_ENTITY_NAME, subEntityName);
+    String detectorRefKey = makeComponentRefKey(detectorType, name);
+
+    fillInDetectorWrapperProperties(nestedProperties, yamlConfig, detectorType, datasetTimegranularity);
+
+    buildComponentSpec(metricUrn, yamlConfig, detectorType, detectorRefKey);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(PROP_CLASS_NAME, BaselineFillingMergeWrapper.class.getName());
+    properties.put(PROP_NESTED, Collections.singletonList(nestedProperties));
+    properties.put(PROP_DETECTOR, detectorRefKey);
+
+    // fill in baseline provider properties
+    if (DETECTION_REGISTRY.isBaselineProvider(detectorType)) {
+      // if the detector implements the baseline provider interface, use it to generate baseline
+      properties.put(PROP_BASELINE_PROVIDER, detectorRefKey);
+    } else {
+      String baselineProviderType = DEFAULT_BASELINE_PROVIDER_YAML_TYPE;
+      String baselineProviderKey = makeComponentRefKey(baselineProviderType, name);
+      buildComponentSpec(metricUrn, yamlConfig, baselineProviderType, baselineProviderKey);
+      properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
+    }
+    properties.putAll(mergerProperties);
+    return properties;
+  }
+
+  // fill in window size and unit if detector requires this
+  private static void fillInDetectorWrapperProperties(Map<String, Object> properties, Map<String, Object> yamlConfig, String detectorType, TimeGranularity datasetTimegranularity) {
+    // set default bucketPeriod
+    properties.put(PROP_BUCKET_PERIOD, datasetTimegranularity.toPeriod().toString());
+
+    // override bucketPeriod now since it is needed by detection window
+    if (yamlConfig.containsKey(PROP_BUCKET_PERIOD)){
+      properties.put(PROP_BUCKET_PERIOD, MapUtils.getString(yamlConfig, PROP_BUCKET_PERIOD));
+    }
+
+    // set default detection window
+    setDefaultDetectionWindow(properties, detectorType);
+
+    // override other properties from yaml
+    if (yamlConfig.containsKey(PROP_WINDOW_SIZE)) {
+      properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+      properties.put(PROP_WINDOW_SIZE, MapUtils.getString(yamlConfig, PROP_WINDOW_SIZE));
+    }
+    if (yamlConfig.containsKey(PROP_WINDOW_UNIT)) {
+      properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+      properties.put(PROP_WINDOW_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_UNIT));
+    }
+    if (yamlConfig.containsKey(PROP_WINDOW_DELAY)) {
+      properties.put(PROP_WINDOW_DELAY, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY));
+    }
+    if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
+      properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY_UNIT));
+    }
+    if (yamlConfig.containsKey(PROP_TIMEZONE)){
+      properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, PROP_TIMEZONE));
+    }
+    if (yamlConfig.containsKey(PROP_CACHE_PERIOD_LOOKBACK)) {
+      properties.put(PROP_CACHE_PERIOD_LOOKBACK, MapUtils.getString(yamlConfig, PROP_CACHE_PERIOD_LOOKBACK));
+    }
+  }
+
+  // Set the default detection window if it is not specified.
+  // Here instead of using data granularity we use the detection period to set the default window size.
+  private static void setDefaultDetectionWindow(Map<String, Object> properties, String detectorType) {
+    if (MOVING_WINDOW_DETECTOR_TYPES.contains(detectorType)) {
+      properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+      org.joda.time.Period detectionPeriod =
+          org.joda.time.Period.parse(MapUtils.getString(properties, PROP_BUCKET_PERIOD));
+      int days = detectionPeriod.toStandardDays().getDays();
+      int hours = detectionPeriod.toStandardHours().getHours();
+      int minutes = detectionPeriod.toStandardMinutes().getMinutes();
+      if (days >= 1) {
+        properties.put(PROP_WINDOW_SIZE, 1);
+        properties.put(PROP_WINDOW_UNIT, TimeUnit.DAYS);
+      } else if (hours >= 1) {
+        properties.put(PROP_WINDOW_SIZE, 24);
+        properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+      } else if (minutes >= 1) {
+        properties.put(PROP_WINDOW_SIZE, 6);
+        properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+        properties.put(PROP_FREQUENCY, new TimeGranularity(15, TimeUnit.MINUTES));
+      } else {
+        properties.put(PROP_WINDOW_SIZE, 6);
+        properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+      }
+    }
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/formatter/DetectionConfigFormatter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/formatter/DetectionConfigFormatter.java
index b64075e..127b8a8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/formatter/DetectionConfigFormatter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/formatter/DetectionConfigFormatter.java
@@ -46,15 +46,12 @@ import org.apache.pinot.thirdeye.datasource.DAORegistry;
 import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import org.apache.pinot.thirdeye.detection.health.DetectionHealth;
 import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import org.joda.time.DateTime;
 import org.joda.time.Period;
 import org.joda.time.PeriodType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
 import static org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator.*;
-import static org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator.*;
+import static org.apache.pinot.thirdeye.detection.yaml.translator.builder.DetectionConfigPropertiesBuilder.*;
 
 
 /**
@@ -84,9 +81,6 @@ public class DetectionConfigFormatter implements DTOFormatter<DetectionConfigDTO
   private static final String PROP_NESTED_PROPERTIES_KEY = "nested";
   private static final String PROP_MONITORING_GRANULARITY = "monitoringGranularity";
   private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
-  private static final String PROP_VARIABLES_BUCKET_PERIOD = "variables.bucketPeriod";
-
-  private static final Logger LOG = LoggerFactory.getLogger(DetectionConfigFormatter.class);
 
   private static final long DEFAULT_PRESENTING_WINDOW_SIZE_MINUTELY = TimeUnit.HOURS.toMillis(48);
   private static final long DEFAULT_PRESENTING_WINDOW_SIZE_DAILY = TimeUnit.DAYS.toMillis(30);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/notification/content/BaseNotificationContent.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/notification/content/BaseNotificationContent.java
index f1174ce..8d9b1f4 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/notification/content/BaseNotificationContent.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/notification/content/BaseNotificationContent.java
@@ -318,7 +318,7 @@ public abstract class BaseNotificationContent implements NotificationContent {
     String liftValue = String.format(PERCENTAGE_FORMAT, lift * 100);
 
     // Fetch the lift value for a SLA anomaly
-    if (anomaly.getType().equals(AnomalyType.DATA_MISSING)) {
+    if (anomaly.getType().equals(AnomalyType.DATA_SLA)) {
       liftValue = getFormattedSLALiftValue(anomaly);
     }
 
@@ -329,7 +329,7 @@ public abstract class BaseNotificationContent implements NotificationContent {
    * The lift value for an SLA anomaly is delay from the configured sla. (Ex: 2 days & 3 hours)
    */
   protected static String getFormattedSLALiftValue(MergedAnomalyResultDTO anomaly) {
-    if (!anomaly.getType().equals(AnomalyType.DATA_MISSING)
+    if (!anomaly.getType().equals(AnomalyType.DATA_SLA)
         || anomaly.getProperties() == null || anomaly.getProperties().isEmpty()
         || !anomaly.getProperties().containsKey("sla")
         || !anomaly.getProperties().containsKey("datasetLastRefreshTime")) {
@@ -357,7 +357,7 @@ public abstract class BaseNotificationContent implements NotificationContent {
    * The predicted value for an SLA anomaly is the configured sla. (Ex: 2_DAYS)
    */
   protected static String getSLAPredictedValue(MergedAnomalyResultDTO anomaly) {
-    if (!anomaly.getType().equals(AnomalyType.DATA_MISSING)
+    if (!anomaly.getType().equals(AnomalyType.DATA_SLA)
         || anomaly.getProperties() == null || anomaly.getProperties().isEmpty()
         || !anomaly.getProperties().containsKey("sla")) {
       return "-";
@@ -373,7 +373,7 @@ public abstract class BaseNotificationContent implements NotificationContent {
     String predicted = ThirdEyeUtils.getRoundedValue(anomaly.getAvgBaselineVal());
 
     // For SLA anomalies, we use the sla as the predicted value
-    if (anomaly.getType().equals(AnomalyType.DATA_MISSING)) {
+    if (anomaly.getType().equals(AnomalyType.DATA_SLA)) {
       predicted = getSLAPredictedValue(anomaly);
     }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java
index 271eedb..216784a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java
@@ -20,6 +20,7 @@
 package org.apache.pinot.thirdeye.scheduler;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
@@ -31,7 +32,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.thirdeye.datalayer.pojo.AbstractBean;
 import org.apache.pinot.thirdeye.datalayer.pojo.DetectionConfigBean;
-import org.apache.pinot.thirdeye.detection.DetectionDataSLAJob;
+import org.apache.pinot.thirdeye.detection.dataquality.DataQualityPipelineJob;
 import org.apache.pinot.thirdeye.detection.DetectionPipelineJob;
 import org.apache.pinot.thirdeye.detection.DetectionUtils;
 import org.apache.pinot.thirdeye.detection.TaskUtils;
@@ -55,6 +56,7 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
 
   public static final int DEFAULT_DETECTION_DELAY = 1;
   public static final TimeUnit DEFAULT_ALERT_DELAY_UNIT = TimeUnit.MINUTES;
+  public static final String QUARTZ_DETECTION_GROUPER = TaskConstants.TaskType.DETECTION.toString();
 
   final DetectionConfigManager detectionDAO;
   final Scheduler scheduler;
@@ -79,28 +81,45 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
 
       // add or update
       for (DetectionConfigDTO config : configs) {
-        JobKey key = new JobKey(getJobKey(config.getId()), TaskConstants.TaskType.DETECTION.toString());
         if (!config.isActive()) {
-          LOG.debug("Detection config " + key + " is inactive. Skipping.");
+          LOG.debug("Detection config " + config.getId() + " is inactive. Skipping.");
           continue;
         }
-        if (DetectionUtils.isDataAvailabilityCheckEnabled(config)) {
-          LOG.debug("Detection config " + key + " is enabled for data availability scheduling. Skipping.");
+        if (config.isDataAvailabilitySchedule()) {
+          LOG.debug("Detection config " + config.getId() + " is enabled for data availability scheduling. Skipping.");
           continue;
         }
+
         try {
-          if (scheduler.checkExists(key)) {
-            LOG.info("Detection config  " + key.getName() + " is already scheduled");
-            if (isJobUpdated(config, key)) {
-              // restart job
-              stopJob(key);
-              startJob(config, key);
+          // Schedule detection jobs
+          JobKey detectionJobKey = new JobKey(getJobKey(config.getId(), TaskConstants.TaskType.DETECTION),
+              QUARTZ_DETECTION_GROUPER);
+          JobDetail detectionJob = JobBuilder.newJob(DetectionPipelineJob.class).withIdentity(detectionJobKey).build();
+          if (scheduler.checkExists(detectionJobKey)) {
+            LOG.info("Detection config " + detectionJobKey.getName() + " is already scheduled for detection");
+            if (isJobUpdated(config, detectionJobKey)) {
+              restartJob(config, detectionJob);
+            }
+          } else {
+            startJob(config, detectionJob);
+          }
+
+          // Schedule data quality jobs
+          JobKey dataQualityJobKey = new JobKey(getJobKey(config.getId(), TaskConstants.TaskType.DATA_QUALITY),
+              QUARTZ_DETECTION_GROUPER);
+          JobDetail dataQualityJob = JobBuilder.newJob(DataQualityPipelineJob.class).withIdentity(dataQualityJobKey).build();
+          if (scheduler.checkExists(dataQualityJobKey)) {
+            LOG.info("Detection config " + dataQualityJobKey.getName() + " is already scheduled for data quality");
+            if (isJobUpdated(config, dataQualityJobKey)) {
+              restartJob(config, dataQualityJob);
             }
           } else {
-            startJob(config, key);
+            if (DetectionUtils.isDataQualityCheckEnabled(config)) {
+              startJob(config, dataQualityJob);
+            }
           }
         } catch (Exception e) {
-          LOG.error("Error creating/updating job key {}", key);
+          LOG.error("Error creating/updating job key for detection config {}", config.getId());
         }
       }
 
@@ -131,9 +150,14 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
     }
   }
 
+  private void restartJob(DetectionConfigDTO config, JobDetail job) throws SchedulerException {
+    stopJob(job.getKey());
+    startJob(config, job);
+  }
+
   @Override
   public Set<JobKey> getScheduledJobs() throws SchedulerException {
-    return this.scheduler.getJobKeys(GroupMatcher.jobGroupEquals(TaskConstants.TaskType.DETECTION.toString()));
+    return this.scheduler.getJobKeys(GroupMatcher.jobGroupEquals(QUARTZ_DETECTION_GROUPER));
   }
 
   @Override
@@ -143,20 +167,11 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
   }
 
   @Override
-  public void startJob(AbstractBean config, JobKey key) throws SchedulerException {
+  public void startJob(AbstractBean config, JobDetail job) throws SchedulerException {
     Trigger trigger = TriggerBuilder.newTrigger().withSchedule(
         CronScheduleBuilder.cronSchedule(((DetectionConfigBean) config).getCron())).build();
-    JobDetail detectionJob = JobBuilder.newJob(DetectionPipelineJob.class).withIdentity(key).build();
-
-    this.scheduler.scheduleJob(detectionJob, trigger);
-    LOG.info(String.format("scheduled detection pipeline job %s", detectionJob.getKey().getName()));
-
-    // Data SLA alerts will be scheduled only when enabled by the user.
-    if (DetectionUtils.isDataAvailabilityCheckEnabled((DetectionConfigDTO) config)) {
-      JobDetail dataSLAJob = JobBuilder.newJob(DetectionDataSLAJob.class).withIdentity(key).build();
-      this.scheduler.scheduleJob(dataSLAJob, trigger);
-      LOG.info(String.format("scheduled data sla jobs %s", dataSLAJob.getKey().getName()));
-    }
+    this.scheduler.scheduleJob(job, trigger);
+    LOG.info(String.format("scheduled detection pipeline job %s", job.getKey().getName()));
   }
 
   @Override
@@ -170,8 +185,8 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
   }
 
   @Override
-  public String getJobKey(Long id) {
-    return String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
+  public String getJobKey(Long id, TaskConstants.TaskType taskType) {
+    return String.format("%s_%d", taskType, id);
   }
 
   private boolean isJobUpdated(DetectionConfigDTO config, JobKey key) throws SchedulerException {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/SubscriptionCronScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/SubscriptionCronScheduler.java
index a44d9a7..afac377 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/SubscriptionCronScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/SubscriptionCronScheduler.java
@@ -59,6 +59,7 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
 
   private static final int DEFAULT_ALERT_DELAY = 1;
   private static final TimeUnit DEFAULT_ALERT_DELAY_UNIT = TimeUnit.MINUTES;
+  public static final String QUARTZ_SUBSCRIPTION_GROUPER = TaskConstants.TaskType.DETECTION_ALERT.toString();
 
   final Scheduler scheduler;
   private ScheduledExecutorService scheduledExecutorService;
@@ -111,7 +112,7 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
 
   @Override
   public Set<JobKey> getScheduledJobs() throws SchedulerException {
-    return scheduler.getJobKeys(GroupMatcher.jobGroupEquals(TaskConstants.TaskType.DETECTION_ALERT.toString()));
+    return scheduler.getJobKeys(GroupMatcher.jobGroupEquals(QUARTZ_SUBSCRIPTION_GROUPER));
   }
 
   @Override
@@ -121,12 +122,11 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
   }
 
   @Override
-  public void startJob(AbstractBean config, JobKey key) throws SchedulerException {
+  public void startJob(AbstractBean config, JobDetail job) throws SchedulerException {
     Trigger trigger = TriggerBuilder.newTrigger().withSchedule(
         CronScheduleBuilder.cronSchedule(((DetectionAlertConfigBean) config).getCronExpression())).build();
-    JobDetail job = JobBuilder.newJob(DetectionAlertJob.class).withIdentity(key).build();
     this.scheduler.scheduleJob(job, trigger);
-    LOG.info(String.format("scheduled subscription pipeline job %s", key.getName()));
+    LOG.info(String.format("scheduled subscription pipeline job %s", job.getKey().getName()));
   }
 
   @Override
@@ -139,8 +139,8 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
   }
 
   @Override
-  public String getJobKey(Long id) {
-    return String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, id);
+  public String getJobKey(Long id, TaskConstants.TaskType taskType) {
+    return String.format("%s_%d", taskType, id);
   }
 
   private void deleteAlertJob(JobKey scheduledJobKey) throws SchedulerException {
@@ -157,7 +157,8 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
     Long id = alertConfig.getId();
     boolean isActive = alertConfig.isActive();
 
-    JobKey key = new JobKey(getJobKey(id), TaskConstants.TaskType.DETECTION_ALERT.toString());
+    JobKey key = new JobKey(getJobKey(id, TaskConstants.TaskType.DETECTION_ALERT), QUARTZ_SUBSCRIPTION_GROUPER);
+    JobDetail job = JobBuilder.newJob(DetectionAlertJob.class).withIdentity(key).build();
     boolean isScheduled = scheduledJobs.contains(key);
 
     if (isActive) {
@@ -173,11 +174,11 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
               "Cron expression for config {} with jobKey {} has been changed from {}  to {}. " + "Restarting schedule",
               id, key, cronInSchedule, cronInDatabase);
           stopJob(key);
-          startJob(alertConfig, key);
+          startJob(alertConfig, job);
         }
       } else {
         LOG.info("Found active but not scheduled {}", id);
-        startJob(alertConfig, key);
+        startJob(alertConfig, job);
       }
     } else {
       if (isScheduled) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/ThirdEyeCronScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/ThirdEyeCronScheduler.java
index eb3549c..62c3d88 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/ThirdEyeCronScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/ThirdEyeCronScheduler.java
@@ -23,7 +23,9 @@
 package org.apache.pinot.thirdeye.scheduler;
 
 import java.util.Set;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
 import org.apache.pinot.thirdeye.datalayer.pojo.AbstractBean;
+import org.quartz.JobDetail;
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 
@@ -40,7 +42,7 @@ public interface ThirdEyeCronScheduler extends Runnable {
   void shutdown() throws SchedulerException;
 
   // Trigger the scheduler to start creating jobs.
-  void startJob(AbstractBean config, JobKey key) throws SchedulerException;
+  void startJob(AbstractBean config, JobDetail key) throws SchedulerException;
 
   // Stop the scheduler from scheduling more jobs.
   void stopJob(JobKey key) throws SchedulerException;
@@ -49,5 +51,5 @@ public interface ThirdEyeCronScheduler extends Runnable {
   Set<JobKey> getScheduledJobs() throws SchedulerException;
 
   // Get the key for the scheduling job
-  String getJobKey(Long id);
+  String getJobKey(Long id, TaskConstants.TaskType taskType);
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskSchedulerTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskSchedulerTest.java
index 7861dec..ca2ef7c 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskSchedulerTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskSchedulerTest.java
@@ -241,7 +241,7 @@ public class DataAvailabilityTaskSchedulerTest {
     Map<String, Object> metricSla = new HashMap<>();
     Map<String, Object> props = new HashMap<>();
     props.put("testMetricUrn", metricSla);
-    detection.setDataSLAProperties(props);
+    detection.setDataQualityProperties(props);
     long detection1 = detectionConfigDAO.save(detection);
     long halfHourAgo = TEST_TIME - TimeUnit.MINUTES.toMillis(30);
     createDataset(1, TEST_TIME, halfHourAgo);
@@ -258,7 +258,7 @@ public class DataAvailabilityTaskSchedulerTest {
     jobNames.add(waitingTasks.get(0).getJobName());
     jobNames.add(waitingTasks.get(1).getJobName());
     Assert.assertTrue(jobNames.contains(TaskConstants.TaskType.DETECTION.toString() + "_" + detection1));
-    Assert.assertTrue(jobNames.contains(TaskConstants.TaskType.DATA_SLA.toString() + "_" + detection1));
+    Assert.assertTrue(jobNames.contains(TaskConstants.TaskType.DATA_QUALITY.toString() + "_" + detection1));
   }
 
   @Test
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
index 364c80c..965d6fd 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
@@ -32,6 +32,7 @@ import org.apache.pinot.thirdeye.datalayer.dto.EvaluationDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
 import org.apache.pinot.thirdeye.detection.spi.model.EvaluationSlice;
 import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
@@ -217,6 +218,18 @@ public class MockDataProvider implements DataProvider {
     return this.loader.from(this, config, start, end);
   }
 
+  @Override
+  public List<DatasetConfigDTO> fetchDatasetByDisplayName(String datasetDisplayName) {
+    List<DatasetConfigDTO> datasetConfigDTOs = new ArrayList<>();
+    for (DatasetConfigDTO datasetConfigDTO : getDatasets()) {
+      if (datasetConfigDTO.getDisplayName().equals(datasetDisplayName)) {
+        datasetConfigDTOs.add(datasetConfigDTO);
+      }
+    }
+
+    return datasetConfigDTOs;
+  }
+
   public Map<MetricSlice, DataFrame> getTimeseries() {
     return timeseries;
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java
new file mode 100644
index 0000000..963946d
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java
@@ -0,0 +1,642 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.anomaly.task.TaskContext;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AbstractDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+public class DataQualityTaskRunnerTest {
+  private DetectionPipelineTaskInfo info;
+  private TaskContext context;
+  private DAOTestBase testDAOProvider;
+  private DetectionConfigManager detectionDAO;
+  private DatasetConfigManager datasetDAO;
+  private MergedAnomalyResultManager anomalyDAO;
+  private EvaluationManager evaluationDAO;
+  private MetricConfigDTO metricConfigDTO;
+  private DatasetConfigDTO datasetConfigDTO;
+  private DetectionConfigDTO detectionConfigDTO;
+
+  private long detectorId;
+  private DetectionPipelineLoader loader;
+  private DataProvider provider;
+
+  private static final long GRANULARITY = TimeUnit.DAYS.toMillis(1);
+  private static final long START_TIME = new DateTime(System.currentTimeMillis(), DateTimeZone.forID("UTC"))
+      .withMillisOfDay(0).getMillis();
+
+  @BeforeMethod
+  public void beforeMethod() throws IOException {
+    this.testDAOProvider = DAOTestBase.getInstance();
+    this.detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.loader = new DetectionPipelineLoader();
+
+    DetectionRegistry.registerComponent(DataSlaQualityChecker.class.getName(), "DATA_SLA");
+
+    metricConfigDTO = new MetricConfigDTO();
+    metricConfigDTO.setId(123L);
+    metricConfigDTO.setName("thirdeye-test");
+    metricConfigDTO.setDataset("thirdeye-test-dataset");
+
+    datasetConfigDTO = new DatasetConfigDTO();
+    datasetConfigDTO.setId(124L);
+    datasetConfigDTO.setDataset("thirdeye-test-dataset");
+    datasetConfigDTO.setDisplayName("thirdeye-test-dataset");
+    datasetConfigDTO.setTimeDuration(1);
+    datasetConfigDTO.setTimeUnit(TimeUnit.DAYS);
+    datasetConfigDTO.setTimezone("UTC");
+    datasetConfigDTO.setLastRefreshTime(START_TIME + 4 * GRANULARITY - 1);
+    datasetConfigDTO.setDataSource("TestSource");
+    this.datasetDAO.save(datasetConfigDTO);
+
+    this.provider = new MockDataProvider()
+        .setLoader(this.loader)
+        .setMetrics(Collections.singletonList(metricConfigDTO))
+        .setDatasets(Collections.singletonList(datasetConfigDTO))
+        .setAnomalies(Collections.emptyList());
+
+    detectionConfigDTO = translateSlaConfig(-1, "sla-config-1.yaml");
+    this.detectorId = detectionConfigDTO.getId();
+
+    this.info = new DetectionPipelineTaskInfo();
+    this.info.setConfigId(this.detectorId);
+    this.context = new TaskContext();
+  }
+
+  @AfterMethod(alwaysRun = true)
+  public void afterMethod() {
+    this.testDAOProvider.cleanup();
+  }
+
+  /**
+   * Load and update the detection config from filePath into detectionId
+   */
+  private DetectionConfigDTO translateSlaConfig(long detectionId, String filePath) throws IOException {
+    String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream(filePath), StandardCharsets.UTF_8);
+    DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+    DetectionConfigDTO detectionConfigDTO = translator.translate();
+    if (detectionId < 0) {
+      detectionConfigDTO.setId(this.detectionDAO.save(detectionConfigDTO));
+    } else {
+      detectionConfigDTO.setId(detectionId);
+      this.detectionDAO.update(detectionConfigDTO);
+    }
+    return detectionConfigDTO;
+  }
+
+  /**
+   * Prepare slice [offsetStart, offsetEnd)
+   */
+  private MetricSlice prepareMetricSlice(long offsetStart, long offsetEnd) {
+    long start = START_TIME + offsetStart * GRANULARITY;
+    long end = START_TIME + offsetEnd * GRANULARITY;
+    return MetricSlice.from(123L, start, end);
+  }
+
+  private Map<MetricSlice, DataFrame> prepareMockTimeseriesMap(long offsetStart, long offsetEnd) {
+    Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
+    MetricSlice slice = prepareMetricSlice(offsetStart, offsetEnd);
+    timeSeries.put(slice, prepareMockTimeseries(offsetStart, offsetEnd));
+    return timeSeries;
+  }
+
+  /**
+   * Prepares a mock time-series [offsetStart, offsetEnd] with Geometric progression values starting from 100
+   */
+  private DataFrame prepareMockTimeseries(long offsetStart, long offsetEnd) {
+    long[] values = new long[(int) (offsetEnd - offsetStart + 1)];
+    long[] timestamps = new long[(int) (offsetEnd - offsetStart + 1)];
+
+    long gpFactor = 2;
+    long currentValue = 100;
+    for (long i = offsetStart; i <= offsetEnd; i++) {
+      int index = (int) (i - offsetStart);
+      values[index] = currentValue;
+      timestamps[index] = START_TIME + i * GRANULARITY;
+      currentValue = currentValue * gpFactor;
+    }
+
+    return new DataFrame()
+        .addSeries(COL_VALUE, values)
+        .addSeries(COL_TIME, timestamps);
+  }
+
+  private List<MergedAnomalyResultDTO> retrieveAllAnomalies() {
+    List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
+    // remove id and create_time for comparison purposes
+    anomalies.forEach(anomaly -> anomaly.setId(null));
+    anomalies.forEach(anomaly -> anomaly.setCreatedTime(null));
+    return anomalies;
+  }
+
+  private void cleanUpAnomalies() {
+    List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
+    anomalyDAO.deleteByIds(anomalies.stream().map(AbstractDTO::getId).collect(Collectors.toList()));
+  }
+
+  private MergedAnomalyResultDTO makeSlaAnomaly(long offsetStart, long offsetEnd, String sla, String ruleName) {
+    MergedAnomalyResultDTO anomalyDTO = new MergedAnomalyResultDTO();
+    anomalyDTO.setStartTime(START_TIME + offsetStart * GRANULARITY);
+    anomalyDTO.setEndTime(START_TIME + offsetEnd * GRANULARITY);
+    anomalyDTO.setMetricUrn("thirdeye:metric:123");
+    anomalyDTO.setMetric("thirdeye-test");
+    anomalyDTO.setCollection("thirdeye-test-dataset");
+    anomalyDTO.setType(AnomalyType.DATA_SLA);
+    anomalyDTO.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
+    anomalyDTO.setDetectionConfigId(this.detectorId);
+    anomalyDTO.setChildIds(Collections.emptySet());
+
+    Map<String, String> anomalyProps = new HashMap<>();
+    long start = START_TIME + offsetStart * GRANULARITY;
+    anomalyProps.put("datasetLastRefreshTime", String.valueOf(start - 1));
+    anomalyProps.put("sla", sla);
+    anomalyProps.put("detectorComponentName", ruleName);
+    anomalyProps.put("subEntityName", "test_sla_alert");
+    anomalyDTO.setProperties(anomalyProps);
+    return anomalyDTO;
+  }
+
+  /**
+   * Test if a Data SLA anomaly is crested when data is not available
+   */
+  @Test
+  public void testDataSlaWhenDataIsMissing() throws Exception {
+    Map<MetricSlice, DataFrame> timeSeries = prepareMockTimeseriesMap(0, 3);
+    MockDataProvider mockDataProvider = new MockDataProvider()
+        .setLoader(this.loader)
+        .setTimeseries(timeSeries)
+        .setMetrics(Collections.singletonList(metricConfigDTO))
+        .setDatasets(Collections.singletonList(datasetConfigDTO))
+        .setAnomalies(Collections.emptyList());
+    DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+        this.detectionDAO,
+        this.anomalyDAO,
+        this.evaluationDAO,
+        this.loader,
+        mockDataProvider
+    );
+
+    // CHECK 0: Report data missing immediately if delayed even by a minute (sla = 0_DAYS)
+    // This comes into effect if the detection runs more frequently than the dataset granularity.
+    detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-0.yaml");
+
+    // 1st scan - sla breach
+    //  time:  3____4    5     // We have data till 3rd, data for 4th is missing/delayed
+    //  scan:       |----|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 5 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 1 data sla anomaly should be created
+    List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
+    expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+    // clean up
+    expectedAnomalies.clear();
+    cleanUpAnomalies();
+
+    // CHECK 1: Report data delayed by at least a day (sla = 1_DAYS)
+    detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-1.yaml");
+
+    // 1st scan - sla breach
+    //  time:  3____4    5     // We have data till 3rd, data for 4th is missing/delayed
+    //  scan:       |----|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 5 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 1 data sla anomaly should be created
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+    // clean up
+    expectedAnomalies.clear();
+    cleanUpAnomalies();
+
+
+    // CHECK 2: Report data missing when delayed (sla = 2_DAYS)
+    detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-2.yaml");
+
+    // 1st scan after issue - no sla breach
+    //  time:  3____4    5           // We have data till 3rd, data for 4th is missing/delayed
+    //  scan:       |----|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 5 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 0 data sla anomaly should be created as there is no delay
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+
+    // 2nd scan after issue - sla breach
+    //  time:    3____4    5    6    // Data for 4th still hasn't arrived
+    //  scan:         |---------|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 6 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 1 data sla anomaly should be created as data is delayed by 1 ms
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    expectedAnomalies.add(makeSlaAnomaly(4, 6, "2_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+    // clean up
+    expectedAnomalies.clear();
+    cleanUpAnomalies();
+
+
+    // CHECK 3: Report data missing when delayed by (sla = 3_DAYS)
+    detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-3.yaml");
+
+    // 1st scan after issue - no sla breach
+    //  time:  3____4    5           // We have data till 3rd, data for 4th is missing/delayed
+    //  scan:       |----|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 5 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 0 data sla anomaly should be created as there is no delay
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+
+    // 2nd scan after issue - no sla breach
+    //  time:    3____4    5    6         // Data for 4th still hasn't arrived, no sla breach
+    //  scan:         |---------|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 6 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 0 data sla anomaly should be created as there is no delay
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+
+    // 3rd scan after issue - sla breach
+    //  time:    3____4    5    6    7    // Data for 4th still hasn't arrived
+    //  scan:         |--------------|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 7 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 1 data sla anomaly should be created from 4 to 7 as the data is missing since 3 days
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    expectedAnomalies.add(makeSlaAnomaly(4, 7, "3_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+    // clean up
+    expectedAnomalies.clear();
+    cleanUpAnomalies();
+
+
+    // CHECK 4: Report data missing when there is a gap btw LastRefreshTime & SLA window start
+    detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-1.yaml");
+
+    // 2 cases are possible:
+    // a. with availability events
+    // b. no availability events - directly query source for just the window
+
+    // a. with availability events
+    // 1st scan after issue - sla breach
+    //  time:  3____4    5    6           // We have data till 3rd, data since 4th is missing/delayed
+    //  scan:            |----|
+    this.info.setStart(START_TIME + 5 * GRANULARITY);
+    this.info.setEnd(START_TIME + 6 * GRANULARITY);
+    runner.execute(this.info, this.context);
+
+    // 1 data sla anomaly should be created from 4 to 6
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+    // clean up
+    expectedAnomalies.clear();
+    cleanUpAnomalies();
+
+
+    // b. no availability events - directly query source
+    datasetConfigDTO.setLastRefreshTime(0);
+    datasetDAO.update(datasetConfigDTO);
+    // 1st scan after issue - sla breach
+    //  time:  3____4    5    6           // We have data till 3rd, data since 4th is missing/delayed
+    //  scan:            |----|
+    this.info.setStart(START_TIME + 5 * GRANULARITY);
+    this.info.setEnd(START_TIME + 6 * GRANULARITY);
+    runner.execute(this.info, this.context);
+
+    // 1 data sla anomaly should be created from 4 to 6
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    expectedAnomalies.add(makeSlaAnomaly(5, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+    // clean up
+    expectedAnomalies.clear();
+    cleanUpAnomalies();
+  }
+
+  /**
+   * Test that data SLA anomalies are created when data is partially available
+   */
+  @Test
+  public void testDataSlaWhenDataPartiallyAvailable() throws Exception {
+    Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
+    // Assumption - As per data availability info, we have data till 3rd.
+    datasetConfigDTO.setLastRefreshTime(START_TIME + 4 * GRANULARITY - 1);
+    datasetDAO.update(datasetConfigDTO);
+
+    MockDataProvider mockDataProvider = new MockDataProvider()
+        .setLoader(this.loader)
+        .setTimeseries(timeSeries)
+        .setMetrics(Collections.singletonList(metricConfigDTO))
+        .setDatasets(Collections.singletonList(datasetConfigDTO))
+        .setAnomalies(Collections.emptyList());
+    DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+        this.detectionDAO,
+        this.anomalyDAO,
+        this.evaluationDAO,
+        this.loader,
+        mockDataProvider
+    );
+
+    // Create detection window (2 - 10) with a lot of empty data points (no data after 3rd).
+    MetricSlice sliceWithManyEmptyDataPoints = prepareMetricSlice(2, 10);
+    timeSeries.put(sliceWithManyEmptyDataPoints, prepareMockTimeseries(2, 3));
+    this.info.setStart(sliceWithManyEmptyDataPoints.getStart());
+    this.info.setEnd(sliceWithManyEmptyDataPoints.getEnd());
+    runner.execute(this.info, this.context);
+
+    // 1 data sla anomaly should be created for the missing days
+    List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
+    expectedAnomalies.add(makeSlaAnomaly(4, 10, "1_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+    //clean up
+    expectedAnomalies.clear();
+    cleanUpAnomalies();
+
+
+    // Create another detection window (2 - 10) with no empty data points.
+    MetricSlice sliceWithFullDataPoints = prepareMetricSlice(2, 10);
+    timeSeries.put(sliceWithFullDataPoints, prepareMockTimeseries(2, 9));
+    this.info.setStart(sliceWithFullDataPoints.getStart());
+    this.info.setEnd(sliceWithFullDataPoints.getEnd());
+    runner.execute(this.info, this.context);
+
+    // 0 data sla anomaly should be created as data is considered to be completely available (above threshold)
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+
+
+    // Create another detection window (2 - 10) with more data points than the data availability event.
+    MetricSlice sliceWithMoreDataPointsThanEvent = prepareMetricSlice(2, 10);
+    timeSeries.put(sliceWithMoreDataPointsThanEvent, prepareMockTimeseries(2, 7));
+    this.info.setStart(sliceWithMoreDataPointsThanEvent.getStart());
+    this.info.setEnd(sliceWithMoreDataPointsThanEvent.getEnd());
+    runner.execute(this.info, this.context);
+
+    // 1 data sla anomaly should be created for the missing days
+    // Data source has more data than what the availability event reports, the sla anomaly should reflect accordingly.
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    expectedAnomalies = new ArrayList<>();
+    expectedAnomalies.add(makeSlaAnomaly(8, 10, "1_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+    //clean up
+    expectedAnomalies.clear();
+    cleanUpAnomalies();
+  }
+
+  /**
+   * Test that no Data SLA anomaly is created when data is complete and available
+   */
+  @Test
+  public void testDataSlaWhenDataAvailable() throws Exception {
+    Map<MetricSlice, DataFrame> timeSeries = prepareMockTimeseriesMap(0, 3);
+    MockDataProvider mockDataProvider = new MockDataProvider()
+        .setLoader(this.loader)
+        .setTimeseries(timeSeries)
+        .setMetrics(Collections.singletonList(metricConfigDTO))
+        .setDatasets(Collections.singletonList(datasetConfigDTO))
+        .setAnomalies(Collections.emptyList());
+    DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+        this.detectionDAO,
+        this.anomalyDAO,
+        this.evaluationDAO,
+        this.loader,
+        mockDataProvider
+    );
+
+    // Prepare the data sla task over existing data in time-series
+    this.info.setStart(START_TIME + 1 * GRANULARITY);
+    this.info.setEnd(START_TIME + 3 * GRANULARITY);
+    runner.execute(this.info, this.context);
+
+    // No data sla anomaly should be created
+    List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+
+    // Prepare the data sla task over existing data in time-series
+    this.info.setStart(START_TIME);
+    this.info.setEnd(START_TIME + 4 * GRANULARITY);
+    runner.execute(this.info, this.context);
+
+    // No data sla anomaly should be created
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+  }
+
+  /**
+   * Note that we do not support sla on dimensional data. This test validates that no SLA anomalies are created
+   * when some dimensional data is missing but the overall dataset is complete.
+   */
+  @Test
+  public void testDataSlaWhenDimensionalDataUnavailable() throws Exception {
+    Map<MetricSlice, DataFrame> timeSeries = prepareMockTimeseriesMap(0, 3);
+    // Prepare the mock time-series where 2 dimensional data points are missing but the overall metric has complete data
+    // Cached slice time window represents that we have complete data (4 points)
+    Multimap<String, String> filters = HashMultimap.create();
+    filters.put("dim1", "1");
+    MetricSlice dimensionSlice = MetricSlice.from(123L, START_TIME, START_TIME + 4 * GRANULARITY, filters);
+    // create 2 data points
+    timeSeries.put(dimensionSlice, prepareMockTimeseries(0, 1));
+    MockDataProvider mockDataProvider = new MockDataProvider()
+        .setLoader(this.loader)
+        .setTimeseries(timeSeries)
+        .setMetrics(Collections.singletonList(metricConfigDTO))
+        .setDatasets(Collections.singletonList(datasetConfigDTO))
+        .setAnomalies(Collections.emptyList());
+    DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+        this.detectionDAO,
+        this.anomalyDAO,
+        this.evaluationDAO,
+        this.loader,
+        mockDataProvider
+    );
+
+    // Scan a period where dimensional data is missing but dataset is complete.
+    this.info.setStart(START_TIME + 3 * GRANULARITY);
+    this.info.setEnd(START_TIME + 4 * GRANULARITY);
+    runner.execute(this.info, this.context);
+
+    // 0 SLA anomaly should be created. Though dimensional data is missing, the dataset as a whole is complete.
+    List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+
+    // Prepare the data sla task on unavailable data
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 5 * GRANULARITY);
+    runner.execute(this.info, this.context);
+
+    // 1 SLA anomaly should be created
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    Assert.assertEquals(anomalies.get(0).getStartTime(), START_TIME + 4 * GRANULARITY);
+    Assert.assertEquals(anomalies.get(0).getEndTime(), START_TIME + 5 * GRANULARITY);
+    Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_SLA);
+    Assert.assertTrue(anomalies.get(0).getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION));
+    Map<String, String> anomalyProps = new HashMap<>();
+    anomalyProps.put("datasetLastRefreshTime", String.valueOf(START_TIME + 4 * GRANULARITY - 1));
+    anomalyProps.put("sla", "1_DAYS");
+    anomalyProps.put("detectorComponentName", "slaRule1:DATA_SLA");
+    anomalyProps.put("subEntityName", "test_sla_alert");
+    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
+  }
+
+  /**
+   * Test if data sla anomalies are properly merged
+   */
+  @Test
+  public void testDataSlaAnomalyMerge() throws Exception {
+    Map<MetricSlice, DataFrame> timeSeries = prepareMockTimeseriesMap( 0, 3);
+    MockDataProvider mockDataProvider = new MockDataProvider()
+        .setLoader(this.loader)
+        .setTimeseries(timeSeries)
+        .setMetrics(Collections.singletonList(metricConfigDTO))
+        .setDatasets(Collections.singletonList(datasetConfigDTO))
+        .setAnomalies(Collections.emptyList());
+    DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+        this.detectionDAO,
+        this.anomalyDAO,
+        this.evaluationDAO,
+        this.loader,
+        mockDataProvider
+    );
+
+    // 1st scan
+    //  time:  3____4    5      // We have data till 3rd, data for 4th is missing/delayed
+    //  scan:       |----|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 5 * GRANULARITY);
+    runner.execute(this.info, this.context);
+
+    // 1 data sla anomaly should be created from 4 to 5
+    List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 1);
+    MergedAnomalyResultDTO slaAnomaly = anomalies.get(0);
+    Assert.assertEquals(slaAnomaly.getStartTime(), START_TIME + 4 * GRANULARITY);
+    Assert.assertEquals(slaAnomaly.getEndTime(), START_TIME + 5 * GRANULARITY);
+    Assert.assertEquals(slaAnomaly.getType(), AnomalyType.DATA_SLA);
+    Assert.assertTrue(slaAnomaly.getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION));
+    Map<String, String> anomalyProps = new HashMap<>();
+    anomalyProps.put("datasetLastRefreshTime", String.valueOf(START_TIME + 4 * GRANULARITY - 1));
+    anomalyProps.put("sla", "1_DAYS");
+    anomalyProps.put("detectorComponentName", "slaRule1:DATA_SLA");
+    anomalyProps.put("subEntityName", "test_sla_alert");
+    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
+
+    // 2nd scan
+    //  time:    3____4    5    6    // Data for 4th still hasn't arrived
+    //  scan:         |---------|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 6 * GRANULARITY);
+    runner.execute(this.info, this.context);
+
+    // We should now have 2 anomalies in our database (4 to 5) and (4 to 6)
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 2);
+    List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
+    expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
+    expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+
+    // 3rd scan
+    //  time:  3____4____5    6    7    // Data for 4th has arrived by now, data for 5th is still missing
+    //  scan:       |--------------|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 7 * GRANULARITY);
+    datasetConfigDTO.setLastRefreshTime(START_TIME + 5 * GRANULARITY - 1);
+    datasetDAO.update(datasetConfigDTO);
+    runner = new DataQualityPipelineTaskRunner(
+        this.detectionDAO,
+        this.anomalyDAO,
+        this.evaluationDAO,
+        this.loader,
+        mockDataProvider
+    );
+    runner.execute(this.info, this.context);
+
+    // We will now have 3 anomalies in our database
+    // In the current iteration we will create 1 new anomaly from (5 to 7)
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 3);
+    expectedAnomalies = new ArrayList<>();
+    expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
+    expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+    expectedAnomalies.add(makeSlaAnomaly(5, 7, "1_DAYS", "slaRule1:DATA_SLA"));
+    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunnerTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunnerTest.java
deleted file mode 100644
index dd58ef7..0000000
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunnerTest.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.datasla;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.pinot.thirdeye.anomaly.AnomalyType;
-import org.apache.pinot.thirdeye.anomaly.task.TaskContext;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
-import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
-import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-public class DatasetSlaTaskRunnerTest {
-  private DetectionPipelineTaskInfo info;
-  private TaskContext context;
-  private DAOTestBase testDAOProvider;
-  private DetectionConfigManager detectionDAO;
-  private DatasetConfigManager datasetDAO;
-  private MergedAnomalyResultManager anomalyDAO;
-  private EvaluationManager evaluationDAO;
-  private MetricConfigDTO metricConfigDTO;
-  private DatasetConfigDTO datasetConfigDTO;
-  private DetectionConfigDTO detectionConfigDTO;
-  private long detectorId;
-
-  private final Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
-  private DataFrame dataFrame;
-
-  private static long period = TimeUnit.DAYS.toMillis(1);
-  private static DateTimeZone timezone = DateTimeZone.forID("UTC");
-  private static long startTime = new DateTime(System.currentTimeMillis(), timezone).withMillisOfDay(0).getMillis();
-
-  @BeforeMethod
-  public void beforeMethod() {
-    this.testDAOProvider = DAOTestBase.getInstance();
-    this.detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
-    this.datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
-    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
-    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
-
-    metricConfigDTO = new MetricConfigDTO();
-    metricConfigDTO.setId(123L);
-    metricConfigDTO.setName("thirdeye-test");
-    metricConfigDTO.setDataset("thirdeye-test-dataset");
-
-    datasetConfigDTO = new DatasetConfigDTO();
-    datasetConfigDTO.setId(124L);
-    datasetConfigDTO.setDataset("thirdeye-test-dataset");
-    datasetConfigDTO.setTimeDuration(1);
-    datasetConfigDTO.setTimeUnit(TimeUnit.DAYS);
-    datasetConfigDTO.setTimezone("UTC");
-    datasetConfigDTO.setLastRefreshTime(startTime + 4 * period - 1);
-    this.datasetDAO.save(datasetConfigDTO);
-
-    detectionConfigDTO = new DetectionConfigDTO();
-    detectionConfigDTO.setName("myName");
-    detectionConfigDTO.setDescription("myDescription");
-    detectionConfigDTO.setCron("myCron");
-    Map<String, Object> metricSla = new HashMap<>();
-    metricSla.put("sla", "1_DAYS");
-    Map<String, Object> props = new HashMap<>();
-    props.put("thirdeye:metric:123", metricSla);
-    detectionConfigDTO.setDataSLAProperties(props);
-    this.detectorId = this.detectionDAO.save(detectionConfigDTO);
-
-    this.info = new DetectionPipelineTaskInfo();
-    this.info.setConfigId(this.detectorId);
-    this.context = new TaskContext();
-
-    // Prepare the mock time-series data with 1 ms granularity
-    this.dataFrame = new DataFrame()
-        .addSeries(COL_VALUE, 100, 200, 500, 1000)
-        .addSeries(COL_TIME, startTime, startTime + period, startTime + 2*period, startTime + 3*period);
-    MetricSlice slice = MetricSlice.from(123L, startTime, startTime + 4*period);
-    this.timeSeries.put(slice, dataFrame);
-  }
-
-  @AfterMethod(alwaysRun = true)
-  public void afterMethod() {
-    this.testDAOProvider.cleanup();
-  }
-
-  /**
-   * Test if a Data SLA anomaly is crested when data is not available
-   */
-  @Test
-  public void testDataSlaWhenDataIsMissing() throws Exception {
-    MockDataProvider mockDataProvider = new MockDataProvider()
-        .setTimeseries(timeSeries)
-        .setMetrics(Collections.singletonList(metricConfigDTO))
-        .setDatasets(Collections.singletonList(datasetConfigDTO));
-    Map<String, Object> metricSla = new HashMap<>();
-    Map<String, Object> props = new HashMap<>();
-    DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-
-    // CHECK 1: Report data missing immediately (sla - 0 ms)
-    metricSla.put("sla", "1_DAYS");
-    props.put("thirdeye:metric:123", metricSla);
-    detectionConfigDTO.setDataSLAProperties(props);
-    this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-
-    // 1st scan after issue
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 5 * period);
-    runner.execute(this.info, this.context);
-    // 1 data sla anomaly should be created
-    List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 1);
-    Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
-    Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 5 * period);
-    Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
-    anomalyDAO.delete(anomalies.get(0));  // clean up
-
-
-    // CHECK 2: Report data missing when delayed by (sla - 1 day)
-    metricSla.put("sla", "2_DAYS");
-    props.put("thirdeye:metric:123", metricSla);
-    detectionConfigDTO.setDataSLAProperties(props);
-    this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-
-    // 1st scan after issue
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 5 * period);
-    runner.execute(this.info, this.context);
-    // 0 data sla anomaly should be created as there is no delay
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 0);
-
-    // 2nd scan after issue
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 6 * period);
-    runner.execute(this.info, this.context);
-    // 1 data sla anomaly should be created as data is delayed by 1 ms
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 1);
-    Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
-    Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 6 * period);
-    Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
-    Map<String, String> anomalyProps = new HashMap<>();
-    anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
-    anomalyProps.put("sla", "2_DAYS");
-    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-    anomalyDAO.delete(anomalies.get(0));  // clean up
-
-
-    // CHECK 3: Report data missing when delayed by (sla - 2 ms)
-    metricSla.put("sla", "3_DAYS");
-    props.put("thirdeye:metric:123", metricSla);
-    detectionConfigDTO.setDataSLAProperties(props);
-    this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-
-    // 1st scan after issue
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 5 * period);
-    runner.execute(this.info, this.context);
-    // 0 data sla anomaly should be created as there is no delay
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 0);
-
-    // 2nd scan after issue
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 6 * period);
-    runner.execute(this.info, this.context);
-    // 0 data sla anomaly should be created as there is no delay
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 0);
-
-    // 3rd scan after issue
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 7 * period);
-    runner.execute(this.info, this.context);
-    // 1 data sla anomaly should be created as the data is delayed by 2 ms
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 1);
-    Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
-    Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 7 * period);
-    Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
-    anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
-    anomalyProps.put("sla", "3_DAYS");
-    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-    anomalyDAO.delete(anomalies.get(0));  // clean up
-
-
-    // CHECK 4: Report data missing when there is a gap btw LastRefreshTime & SLA window start
-    metricSla.put("sla", "1_DAYS");
-    props.put("thirdeye:metric:123", metricSla);
-    detectionConfigDTO.setDataSLAProperties(props);
-    this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-    this.info.setStart(startTime + 5 * period);
-    this.info.setEnd(startTime + 6 * period);
-    runner.execute(this.info, this.context);
-
-    // 1 data sla anomaly should be created
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 1);
-    Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
-    Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 6 * period);
-    Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
-    anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
-    anomalyProps.put("sla", "1_DAYS");
-    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-    anomalyDAO.delete(anomalies.get(0));  // clean up
-  }
-
-  /**
-   * Test that no data SLA anomalies are created when data is partially available
-   * We do not deal with partial data in this current iteration/phase
-   */
-  @Test
-  public void testDataSlaWhenDataPartiallyAvailable() throws Exception {
-    MetricSlice sliceOverlapBelowThreshold = MetricSlice.from(123L, startTime + 2 * period, startTime + 10 * period);
-    timeSeries.put(sliceOverlapBelowThreshold, new DataFrame()
-        .addSeries(COL_VALUE, 500, 1000)
-        .addSeries(COL_TIME, startTime + 2 * period, startTime + 3 * period));
-    MetricSlice sliceOverlapAboveThreshold = MetricSlice.from(123L, startTime + 2 * period, startTime + 4 * period);
-    timeSeries.put(sliceOverlapAboveThreshold, new DataFrame()
-        .addSeries(COL_VALUE, 500, 1000)
-        .addSeries(COL_TIME, startTime + 2 * period, startTime + 3 * period));
-    MockDataProvider mockDataProvider = new MockDataProvider()
-        .setTimeseries(timeSeries)
-        .setMetrics(Collections.singletonList(metricConfigDTO))
-        .setDatasets(Collections.singletonList(datasetConfigDTO));
-    DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-    datasetConfigDTO.setLastRefreshTime(0);
-    datasetDAO.update(datasetConfigDTO);
-
-    this.info.setStart(sliceOverlapBelowThreshold.getStart());
-    this.info.setEnd(sliceOverlapBelowThreshold.getEnd());
-    runner.execute(this.info, this.context);
-
-    // 1 data sla anomaly should be created for the missing days
-    List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 1);
-    Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
-    Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 10 * period);
-    Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
-    Map<String, String> anomalyProps = new HashMap<>();
-    anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 3 * period));
-    anomalyProps.put("sla", "1_DAYS");
-    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-    anomalyDAO.delete(anomalies.get(0));  // clean up
-
-    this.info.setStart(sliceOverlapAboveThreshold.getStart());
-    this.info.setEnd(sliceOverlapAboveThreshold.getEnd());
-    runner.execute(this.info, this.context);
-
-    // 0 data sla anomaly should be created as data is considered to be completely availability (above threshold)
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 0);
-  }
-
-  /**
-   * Test that no Data SLA anomaly is created when data is complete and available
-   */
-  @Test
-  public void testDataSlaWhenDataAvailable() throws Exception {
-    MockDataProvider mockDataProvider = new MockDataProvider()
-        .setTimeseries(timeSeries)
-        .setMetrics(Collections.singletonList(metricConfigDTO))
-        .setDatasets(Collections.singletonList(datasetConfigDTO));
-    DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-
-    // Prepare the data sla task over existing data in time-series
-    this.info.setStart(startTime + period);
-    this.info.setEnd(startTime + 3 * period);
-    runner.execute(this.info, this.context);
-
-    // No data sla anomaly should be created
-    List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 0);
-
-    // Prepare the data sla task over existing data in time-series
-    this.info.setStart(startTime);
-    this.info.setEnd(startTime + 4 * period);
-    runner.execute(this.info, this.context);
-
-    // No data sla anomaly should be created
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 0);
-  }
-
-  /**
-   * Test that no data SLA anomaly is created when dataset is available but dimensional data is missing
-   */
-  @Test
-  public void testDataSlaWhenDimensionalDataUnavailable() throws Exception {
-    // Prepare the mock dimensional time-series where 2 data points are missing but the overall dataset has complete data
-    Multimap<String, String> filters = HashMultimap.create();
-    filters.put("dim1", "1");
-    MetricSlice dimensionSlice = MetricSlice.from(123L, startTime, startTime + 4 * period, filters);
-    Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
-    // has only 2 data points
-    DataFrame dataFrame = new DataFrame()
-        .addSeries(COL_VALUE, 100, 200)
-        .addSeries(COL_TIME, startTime, startTime + period);
-    timeSeries.put(dimensionSlice, dataFrame);
-    MockDataProvider mockDataProvider = new MockDataProvider()
-        .setTimeseries(timeSeries)
-        .setMetrics(Collections.singletonList(metricConfigDTO))
-        .setDatasets(Collections.singletonList(datasetConfigDTO));
-    DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-
-    Map<String, Object> metricSla = new HashMap<>();
-    metricSla.put("sla", "1_DAYS");
-    Map<String, Object> props = new HashMap<>();
-    props.put("thirdeye:metric:123:dim1%3D1", metricSla);
-    detectionConfigDTO.setDataSLAProperties(props);
-    this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-
-    // Scan a period where dimensional data is missing but dataset is complete.
-    this.info.setConfigId(this.detectorId);
-    this.info.setStart(startTime + 3 * period);
-    this.info.setEnd(startTime + 4 * period);
-    runner.execute(this.info, this.context);
-
-    // 0 SLA anomaly should be created. Though dimensional data is missing, the dataset as a whole is complete.
-    List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 0);
-
-    // Prepare the data sla task on unavailable data
-    this.info.setConfigId(this.detectorId);
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 5 * period);
-    runner.execute(this.info, this.context);
-
-    // 1 SLA anomaly should be created
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 1);
-    Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
-    Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 5 * period);
-    Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
-    Map<String, String> anomalyProps = new HashMap<>();
-    anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
-    anomalyProps.put("sla", "1_DAYS");
-    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-  }
-
-  /**
-   * Test if data sla anomalies are properly merged
-   */
-  @Test
-  public void testDataSlaAnomalyMerge() throws Exception {
-    MockDataProvider mockDataProvider = new MockDataProvider()
-        .setTimeseries(timeSeries)
-        .setMetrics(Collections.singletonList(metricConfigDTO))
-        .setDatasets(Collections.singletonList(datasetConfigDTO));
-    DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-
-    // 1st scan
-    //  time:  3____4    5      // We have data till 3rd, data for 4th is missing/delayed
-    //  scan:       |----|
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 5 * period);
-    runner.execute(this.info, this.context);
-
-    // 1 data sla anomaly should be created from 4 to 5
-    List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 1);
-    MergedAnomalyResultDTO slaAnomaly = anomalies.get(0);
-    Assert.assertEquals(slaAnomaly.getStartTime(), startTime + 4 * period);
-    Assert.assertEquals(slaAnomaly.getEndTime(), startTime + 5 * period);
-    Assert.assertEquals(slaAnomaly.getType(), AnomalyType.DATA_MISSING);
-    Map<String, String> anomalyProps = new HashMap<>();
-    anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
-    anomalyProps.put("sla", "1_DAYS");
-    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-
-    // 2nd scan
-    //  time:    3____4    5    6    // Data for 4th still hasn't arrived
-    //  scan:         |---------|
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 6 * period);
-    runner.execute(this.info, this.context);
-
-    // We should now have 2 anomalies in our database (1 parent, 1 child)
-    // The new anomaly (4 to 6) will be parent of the existing anomaly (4 to 5) created during the previous scan
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 2);
-    for (int i = 0; i < 2; i++) {
-      MergedAnomalyResultDTO anomalyDTO = anomalies.get(i);
-      if (!anomalyDTO.isChild()) {
-        // Parent anomaly
-        Assert.assertEquals(anomalyDTO.getStartTime(), startTime + 4 * period);
-        Assert.assertEquals(anomalyDTO.getEndTime(), startTime + 6 * period);
-        Assert.assertEquals(anomalyDTO.getType(), AnomalyType.DATA_MISSING);
-        Assert.assertTrue(anomalyDTO.getChildIds().contains(slaAnomaly.getId()));
-        anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
-        anomalyProps.put("sla", "1_DAYS");
-        Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-      } else {
-        // Child anomaly
-        Assert.assertEquals(anomalyDTO.getStartTime(), startTime + 4 * period);
-        Assert.assertEquals(anomalyDTO.getEndTime(), startTime + 5 * period);
-        Assert.assertEquals(anomalyDTO.getType(), AnomalyType.DATA_MISSING);
-        Assert.assertTrue(anomalyDTO.isChild());
-        anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
-        anomalyProps.put("sla", "1_DAYS");
-        Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-      }
-    }
-
-    // 3rd scan
-    //  time:  3____4____5    6    7    // Data for 4th has arrived by now, data for 5th is still missing
-    //  scan:       |--------------|
-    this.info.setStart(startTime + 4 * period);
-    this.info.setEnd(startTime + 7 * period);
-    datasetConfigDTO.setLastRefreshTime(startTime + 5 * period - 1);
-    datasetDAO.update(datasetConfigDTO);
-    runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-    runner.execute(this.info, this.context);
-
-    // We will now have 3 anomalies in our database (2 parent and 1 child)
-    // 1 parent and 1 child created during the previous iterations
-    // In the current iteration we will create 1 new anomaly from (5 to 7)
-    anomalies = anomalyDAO.findAll();
-    Assert.assertEquals(anomalies.size(), 3);
-    anomalies = anomalies.stream().filter(anomaly -> !anomaly.isChild()).collect(Collectors.toList());
-    // We should now have 2 parent anomalies
-    Assert.assertEquals(anomalies.size(), 2);
-    anomalies.get(0).setId(null);
-    anomalies.get(1).setId(null);
-
-    List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
-    MergedAnomalyResultDTO anomalyDTO = new MergedAnomalyResultDTO();
-    anomalyDTO.setStartTime(startTime + 5 * period);
-    anomalyDTO.setEndTime(startTime + 7 * period);
-    anomalyDTO.setType(AnomalyType.DATA_MISSING);
-    anomalyDTO.setDetectionConfigId(this.detectorId);
-    anomalyDTO.setChildIds(Collections.emptySet());
-    anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
-    anomalyProps.put("sla", "1_DAYS");
-    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-    expectedAnomalies.add(anomalyDTO);
-
-    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
-  }
-}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigSlaTranslatorTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigSlaTranslatorTest.java
new file mode 100644
index 0000000..db9087b
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigSlaTranslatorTest.java
@@ -0,0 +1,121 @@
+package org.apache.pinot.thirdeye.detection.yaml.translator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.pinot.PinotThirdEyeDataSource;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker;
+import org.apache.pinot.thirdeye.detection.components.MockGrouper;
+import org.apache.pinot.thirdeye.detection.components.RuleBaselineProvider;
+import org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter;
+import org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class DetectionConfigSlaTranslatorTest {
+
+  private Long metricId;
+  private DataProvider provider;
+  private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private DAOTestBase testDAOProvider;
+  private DAORegistry daoRegistry;
+
+  @BeforeClass
+  void beforeClass() {
+    testDAOProvider = DAOTestBase.getInstance();
+    daoRegistry = DAORegistry.getInstance();
+  }
+
+  @AfterClass(alwaysRun = true)
+  void afterClass() {
+    testDAOProvider.cleanup();
+  }
+
+  @BeforeMethod
+  public void setUp() {
+    MetricConfigDTO metricConfig = new MetricConfigDTO();
+    metricConfig.setAlias("alias");
+    metricConfig.setName("test_metric");
+    metricConfig.setDataset("test_dataset");
+    this.metricId = 1L;
+    metricConfig.setId(metricId);
+    daoRegistry.getMetricConfigDAO().save(metricConfig);
+
+    DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO();
+    datasetConfigDTO.setDataset("test_dataset");
+    datasetConfigDTO.setTimeUnit(TimeUnit.DAYS);
+    datasetConfigDTO.setTimeDuration(1);
+    datasetConfigDTO.setDataSource(PinotThirdEyeDataSource.DATA_SOURCE_NAME);
+    daoRegistry.getDatasetConfigDAO().save(datasetConfigDTO);
+
+    DetectionRegistry.registerComponent(DataSlaQualityChecker.class.getName(), "DATA_SLA");
+    DetectionRegistry.registerComponent(ThresholdRuleDetector.class.getName(), "THRESHOLD");
+    DetectionRegistry.registerComponent(ThresholdRuleAnomalyFilter.class.getName(), "THRESHOLD_RULE_FILTER");
+    DetectionRegistry.registerComponent(RuleBaselineProvider.class.getName(), "RULE_BASELINE");
+    DetectionRegistry.registerComponent(MockGrouper.class.getName(), "MOCK_GROUPER");
+    this.provider = new MockDataProvider().setMetrics(Collections.singletonList(metricConfig)).setDatasets(Collections.singletonList(datasetConfigDTO));
+  }
+
+  @Test
+  public void testSlaTranslation() throws Exception {
+    String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-1.yaml"), "UTF-8");
+    DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+    DetectionConfigDTO result = translator.translate();
+    YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-1.json"), YamlTranslationResult.class);
+    Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+    Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+  }
+
+  @Test
+  public void testDetectionAndSlaTranslation() throws Exception {
+    String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-2.yaml"), "UTF-8");
+    DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+    DetectionConfigDTO result = translator.translate();
+    YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-2.json"), YamlTranslationResult.class);
+    Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+    Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+  }
+
+  @Test
+  public void testMultipleDetectionFilterAndSlaTranslation() throws Exception {
+    String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-3.yaml"), "UTF-8");
+    DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+    DetectionConfigDTO result = translator.translate();
+    YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-3.json"), YamlTranslationResult.class);
+    Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+    Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+  }
+
+  @Test
+  public void testSlaTranslationWithSingleMetricEntityAlert() throws Exception {
+    String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-4.yaml"), "UTF-8");
+    DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+    DetectionConfigDTO result = translator.translate();
+    YamlTranslationResult expected   = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-4.json"), YamlTranslationResult.class);
+    Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+    Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+  }
+
+  @Test
+  public void testSlaTranslationWithMultiMetricEntityAlert() throws Exception {
+    String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-5.yaml"), "UTF-8");
+    DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+    DetectionConfigDTO result = translator.translate();
+    YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-5.json"), YamlTranslationResult.class);
+    Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+    Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlTranslationResult.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlTranslationResult.java
index 10055fa..3f7dfa4 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlTranslationResult.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlTranslationResult.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 public class YamlTranslationResult {
   private Map<String, Object> properties;
+  private Map<String, Object> dataQualityProperties;
   private Map<String, Object> components;
   private String cron;
 
@@ -35,6 +36,14 @@ public class YamlTranslationResult {
     return properties;
   }
 
+  public void setDataQualityProperties(Map<String, Object> dataQualityProperties) {
+    this.dataQualityProperties = dataQualityProperties;
+  }
+
+  public Map<String, Object> getDataQualityProperties() {
+    return dataQualityProperties;
+  }
+
   public void setComponents(Map<String, Object> components) {
     this.components = components;
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/content/templates/TestMetricAnomaliesContent.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/content/templates/TestMetricAnomaliesContent.java
index 5db1d3d..60d0d23 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/content/templates/TestMetricAnomaliesContent.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/content/templates/TestMetricAnomaliesContent.java
@@ -18,6 +18,7 @@ package org.apache.pinot.thirdeye.notification.content.templates;
 
 import java.util.Properties;
 import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
 import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
@@ -174,7 +175,8 @@ public class TestMetricAnomaliesContent {
         new DateTime(2020, 1, 7, 17, 0, dateTimeZone).getMillis(),
         TEST, TEST, 0.1, 1l, new DateTime(2020, 1, 7, 17, 1, dateTimeZone).getMillis());
     anomaly.setDetectionConfigId(detectionConfigDTO.getId());
-    anomaly.setType(AnomalyType.DATA_MISSING);
+    anomaly.setType(AnomalyType.DATA_SLA);
+    anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
     anomaly.setMetricUrn("thirdeye:metric:3");
     Map<String, String> props = new HashMap<>();
     props.put("sla", "3_DAYS");
@@ -187,7 +189,8 @@ public class TestMetricAnomaliesContent {
         new DateTime(2020, 1, 7, 5, 5, dateTimeZone).getMillis(),
         TEST, TEST, 0.1, 1l, new DateTime(2020, 1, 7, 5, 6, dateTimeZone).getMillis());
     anomaly.setDetectionConfigId(detectionConfigDTO.getId());
-    anomaly.setType(AnomalyType.DATA_MISSING);
+    anomaly.setType(AnomalyType.DATA_SLA);
+    anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
     anomaly.setMetricUrn("thirdeye:metric:3");
     props = new HashMap<>();
     props.put("datasetLastRefreshTime", "" + new DateTime(2020, 1, 4, 0, 0, dateTimeZone).getMillis());
@@ -201,7 +204,8 @@ public class TestMetricAnomaliesContent {
         new DateTime(2020, 1, 1, 15, 0, dateTimeZone).getMillis(),
         TEST, TEST, 0.1, 1l, new DateTime(2020, 1, 1, 16, 0, dateTimeZone).getMillis());
     anomaly.setDetectionConfigId(detectionConfigDTO.getId());
-    anomaly.setType(AnomalyType.DATA_MISSING);
+    anomaly.setType(AnomalyType.DATA_SLA);
+    anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
     anomaly.setMetricUrn("thirdeye:metric:3");
     props = new HashMap<>();
     props.put("datasetLastRefreshTime", "" + new DateTime(2020, 1, 1, 10, 5, dateTimeZone).getMillis());
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/formatter/channels/TestJiraContentFormatter.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/formatter/channels/TestJiraContentFormatter.java
index d20e169..a5311c0 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/formatter/channels/TestJiraContentFormatter.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/formatter/channels/TestJiraContentFormatter.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.pinot.thirdeye.anomaly.AnomalyType;
 import org.apache.pinot.thirdeye.anomaly.ThirdEyeAnomalyConfiguration;
 import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyResult;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
 import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
 import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
@@ -129,7 +130,8 @@ public class TestJiraContentFormatter {
     anomalyResultDTO2.setDetectionConfigId(this.detectionConfigId);
     anomalyResultDTO2.setCollection(COLLECTION_VALUE);
     anomalyResultDTO2.setMetric(METRIC_VALUE);
-    anomalyResultDTO2.setType(AnomalyType.DATA_MISSING);
+    anomalyResultDTO2.setType(AnomalyType.DATA_SLA);
+    anomalyResultDTO2.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
     anomalyResultDTO2.setMetricUrn("thirdeye:metric:124");
     Map<String, String> props = new HashMap<>();
     props.put("sla", "2_HOURS");
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/tools/RunAdhocDatabaseQueriesTool.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/tools/RunAdhocDatabaseQueriesTool.java
index 777e7db..6931778 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/tools/RunAdhocDatabaseQueriesTool.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/tools/RunAdhocDatabaseQueriesTool.java
@@ -725,13 +725,13 @@ public class RunAdhocDatabaseQueriesTool {
 
   public static void main(String[] args) throws Exception {
 
-    File persistenceFile = new File(args[0]);
+    File persistenceFile = new File("/Users/akrai/persistence-linux.yml");
     if (!persistenceFile.exists()) {
       System.err.println("Missing file:" + persistenceFile);
       System.exit(1);
     }
     RunAdhocDatabaseQueriesTool dq = new RunAdhocDatabaseQueriesTool(persistenceFile);
-    dq.disableAllActiveDetections(Collections.singleton(142644400L));
+    dq.disableAllActiveDetections(Collections.singleton(160640739L));
     LOG.info("DONE");
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-0.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-0.yaml
new file mode 100644
index 0000000..947ccd5
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-0.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: thirdeye-test
+dataset: thirdeye-test-dataset
+
+rules:
+- detection:
+  - type: THRESHOLD
+    name: maxThreshold_1
+    params:
+      max: 100
+  quality:
+  - type: DATA_SLA
+    name: slaRule1
+    params:
+      sla: 0_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-1.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-1.yaml
new file mode 100644
index 0000000..192f6f1
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-1.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: thirdeye-test
+dataset: thirdeye-test-dataset
+
+rules:
+- detection:
+    - type: THRESHOLD
+      name: maxThreshold_1
+      params:
+        max: 100
+  quality:
+  - type: DATA_SLA
+    name: slaRule1
+    params:
+      sla: 1_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-2.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-2.yaml
new file mode 100644
index 0000000..6c83a3e
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-2.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: thirdeye-test
+dataset: thirdeye-test-dataset
+
+rules:
+- detection:
+    - type: THRESHOLD
+      name: maxThreshold_1
+      params:
+        max: 100
+  quality:
+  - type: DATA_SLA
+    name: slaRule1
+    params:
+      sla: 2_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-3.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-3.yaml
new file mode 100644
index 0000000..3bffe8d
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-3.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: thirdeye-test
+dataset: thirdeye-test-dataset
+
+rules:
+- detection:
+    - type: THRESHOLD
+      name: maxThreshold_1
+      params:
+        max: 100
+  quality:
+  - type: DATA_SLA
+    name: slaRule1
+    params:
+      sla: 3_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-1.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-1.yaml
new file mode 100644
index 0000000..3d76605
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-1.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: test_metric
+dataset: test_dataset
+
+rules:
+- detection:
+  - type: THRESHOLD
+    name: maxThreshold_1
+    params:
+      max: 100
+  quality:
+  - type: DATA_SLA
+    name: slaRule1
+    params:
+      sla: 2_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-2.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-2.yaml
new file mode 100644
index 0000000..5468efb
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-2.yaml
@@ -0,0 +1,30 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: test_metric
+dataset: test_dataset
+
+filters:
+  D1:
+  - v1
+  - v2
+  D2:
+  - v3
+dimensionExploration:
+  dimensions:
+  - D1
+  - D2
+  minContribution: 0.05
+
+rules:
+- detection:
+  - type: THRESHOLD
+    name: rule1
+    params:
+      max: 100
+  quality:
+  - type: DATA_SLA
+    name: slaRule1
+    params:
+      sla: 2_DAYS
+
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-3.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-3.yaml
new file mode 100644
index 0000000..2c317c1
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-3.yaml
@@ -0,0 +1,65 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: test_metric
+dataset: test_dataset
+
+filters:
+  D1:
+    - v1
+    - v2
+  D2:
+    - v3
+dimensionExploration:
+  dimensions:
+    - D1
+    - D2
+  minContribution: 0.05
+
+rules:
+- detection:
+  - type: THRESHOLD
+    name: maxThreshold_1
+    params:
+      max: 100
+  filter:
+  - type: THRESHOLD_RULE_FILTER
+    name: thresholdFilter_1
+    params:
+      min: 50
+  - type: THRESHOLD_RULE_FILTER
+    name: thresholdFilter_2
+    params:
+      min: 50
+
+- detection:
+  - type: THRESHOLD
+    name: maxThreshold_3
+    params:
+      max: 100
+  quality:
+  - type: DATA_SLA
+    name: slaRule1
+    params:
+      sla: 2_DAYS
+
+- detection:
+  - type: THRESHOLD
+    name: maxThreshold_2
+    params:
+      max: 100
+  filter:
+  - type: THRESHOLD_RULE_FILTER
+    name: thresholdFilter_3
+    params:
+      min: 50
+
+merger:
+  maxGap: 0
+  maxDuration: 100000000
+
+grouper:
+- type: MOCK_GROUPER
+  name: test_grouper
+  params:
+    mockParam: 0.3
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-4.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-4.yaml
new file mode 100644
index 0000000..4876eb9
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-4.yaml
@@ -0,0 +1,33 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+type: COMPOSITE_ALERT
+cron: "0 0 14 * * ? *"
+
+alerts:
+- type: METRIC_ALERT
+  name: metric alert on test_metric
+  metric: test_metric
+  dataset: test_dataset
+  filters:
+    D1:
+    - v1
+    - v2
+    D2:
+    - v3
+  dimensionExploration:
+    dimensions:
+    - D1
+    - D2
+    minContribution: 0.05
+  rules:
+  - detection:
+      - type: THRESHOLD
+        name: maxThreshold_1
+        params:
+          max: 100
+    quality:
+      - type: DATA_SLA
+        name: slaRule1
+        params:
+          sla: 2_DAYS
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-5.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-5.yaml
new file mode 100644
index 0000000..d738842
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-5.yaml
@@ -0,0 +1,90 @@
+detectionName: test_sla_alert
+description: My test sla alert
+type: COMPOSITE_ALERT
+cron: "0 0 14 * * ? *"
+alerts:
+- type: METRIC_ALERT
+  name: metric_alert_on_test_metric_1
+  metric: test_metric
+  dataset: test_dataset
+  filters:
+    D1:
+    - v1
+    - v2
+    D2:
+    - v3
+  dimensionExploration:
+    dimensions:
+    - D1
+    - D2
+    minContribution: 0.05
+  rules:
+  - detection:
+    - type: THRESHOLD
+      name: maxThreshold_1
+      params:
+        max: 100
+    filter:
+    - type: THRESHOLD_RULE_FILTER
+      name: thresholdFilter_1
+      params:
+        min: 50
+    - type: THRESHOLD_RULE_FILTER
+      name: thresholdFilter_2
+      params:
+        min: 100
+  - detection:
+    - type: THRESHOLD
+      name: maxThreshold_3
+      params:
+        max: 100
+    quality:
+    - type: DATA_SLA
+      name: slaRule3
+      params:
+        sla: 2_DAYS
+  - detection:
+    - type: THRESHOLD
+      name: maxThreshold_2
+      params:
+        max: 100
+
+- type: COMPOSITE_ALERT
+  name: composite_alert_on_entity
+  alerts:
+  - type: METRIC_ALERT
+    name: metric_alert_on_test_metric_2
+    metric: test_metric
+    dataset: test_dataset
+    rules:
+    - detection:
+      - type: THRESHOLD
+        name: maxThreshold_1
+        params:
+          max: 100
+      quality:
+      - type: DATA_SLA
+        name: slaRule1
+        params:
+          sla: 2_DAYS
+  - type: METRIC_ALERT
+    name: another_metric_alert_on_test_metric
+    metric: test_metric
+    dataset: test_dataset
+    rules:
+    - detection:
+      - type: THRESHOLD
+        name: maxThreshold_1
+        params:
+          max: 100
+      quality:
+        - type: DATA_SLA
+          name: slaRule2
+          params:
+            sla: 2_DAYS
+
+grouper:
+  - type: MOCK_GROUPER
+    name: test_grouper
+    params:
+      mockParam: 0.3
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-1.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-1.json
new file mode 100644
index 0000000..ddebd38
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-1.json
@@ -0,0 +1,23 @@
+{
+  "dataQualityProperties": {
+    "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+    "nested": [{
+      "qualityCheck": "$slaRule1:DATA_SLA",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+      "metricUrn": "thirdeye:metric:1",
+      "subEntityName": "test_sla_alert"
+    }]
+  },
+  "components": {
+    "slaRule1:DATA_SLA": {
+      "sla": "2_DAYS",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+      "metricUrn": "thirdeye:metric:1"
+    },
+    "maxThreshold_1:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1"
+    }
+  }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-2.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-2.json
new file mode 100644
index 0000000..59a0b31
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-2.json
@@ -0,0 +1,23 @@
+{
+  "dataQualityProperties": {
+    "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+    "nested": [{
+      "qualityCheck": "$slaRule1:DATA_SLA",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+      "subEntityName": "test_sla_alert"
+    }]
+  },
+  "components": {
+    "slaRule1:DATA_SLA": {
+      "sla": "2_DAYS",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "rule1:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    }
+  }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-3.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-3.json
new file mode 100644
index 0000000..e3c363b
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-3.json
@@ -0,0 +1,57 @@
+{
+  "dataQualityProperties": {
+    "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+    "maxGap": 0,
+    "maxDuration": 100000000,
+    "nested": [{
+      "qualityCheck": "$slaRule1:DATA_SLA",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+      "subEntityName": "test_sla_alert",
+      "maxGap": 0,
+      "maxDuration": 100000000
+    }]
+  },
+  "components": {
+    "slaRule1:DATA_SLA": {
+      "sla": "2_DAYS",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "maxThreshold_1:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "maxThreshold_2:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "maxThreshold_3:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "thresholdFilter_1:THRESHOLD_RULE_FILTER": {
+      "min": 50,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "thresholdFilter_2:THRESHOLD_RULE_FILTER": {
+      "min": 50,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "thresholdFilter_3:THRESHOLD_RULE_FILTER": {
+      "min": 50,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "test_grouper:MOCK_GROUPER": {
+      "mockParam": 0.3,
+      "className": "org.apache.pinot.thirdeye.detection.components.MockGrouper",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    }
+  }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-4.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-4.json
new file mode 100644
index 0000000..6a788dc
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-4.json
@@ -0,0 +1,26 @@
+{
+  "dataQualityProperties": {
+    "className": "org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+    "nested": [{
+      "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+      "nested": [{
+        "qualityCheck": "$slaRule1:DATA_SLA",
+        "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+        "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+        "subEntityName": "metric alert on test_metric"
+      }]
+    }]
+  },
+  "components": {
+    "slaRule1:DATA_SLA": {
+      "sla": "2_DAYS",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "maxThreshold_1:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    }
+  }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-5.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-5.json
new file mode 100644
index 0000000..58149a0
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-5.json
@@ -0,0 +1,103 @@
+{
+  "dataQualityProperties": {
+    "className": "org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+    "nested": [
+      {
+        "className": "org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper",
+        "nested": [
+          {
+            "grouper": "$test_grouper:MOCK_GROUPER",
+            "className": "org.apache.pinot.thirdeye.detection.wrapper.GrouperWrapper",
+            "subEntityName": "test_sla_alert",
+            "nested": [
+              {
+                "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+                "nested": [
+                  {
+                    "qualityCheck": "$slaRule3:DATA_SLA",
+                    "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+                    "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+                    "subEntityName": "metric_alert_on_test_metric_1"
+                  }
+                ]
+              },
+              {
+                "className": "org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+                "nested": [
+                  {
+                    "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+                    "nested": [
+                      {
+                        "qualityCheck": "$slaRule1:DATA_SLA",
+                        "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+                        "metricUrn": "thirdeye:metric:1",
+                        "subEntityName": "metric_alert_on_test_metric_2"
+                      }
+                    ]
+                  },
+                  {
+                    "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+                    "nested": [
+                      {
+                        "qualityCheck": "$slaRule2:DATA_SLA",
+                        "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+                        "metricUrn": "thirdeye:metric:1",
+                        "subEntityName": "another_metric_alert_on_test_metric"
+                      }
+                    ]
+                  }
+                ]
+              }
+            ]
+          }
+        ]
+      }
+    ]
+  },
+  "components": {
+    "slaRule1:DATA_SLA": {
+      "sla": "2_DAYS",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+      "metricUrn": "thirdeye:metric:1"
+    },
+    "slaRule2:DATA_SLA": {
+      "sla": "2_DAYS",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+      "metricUrn": "thirdeye:metric:1"
+    },
+    "slaRule3:DATA_SLA": {
+      "sla": "2_DAYS",
+      "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "maxThreshold_1:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1"
+    },
+    "maxThreshold_2:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "maxThreshold_3:THRESHOLD": {
+      "max": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "thresholdFilter_1:THRESHOLD_RULE_FILTER": {
+      "min": 50,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "thresholdFilter_2:THRESHOLD_RULE_FILTER": {
+      "min": 100,
+      "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+      "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+    },
+    "test_grouper:MOCK_GROUPER": {
+      "mockParam": 0.3,
+      "className": "org.apache.pinot.thirdeye.detection.components.MockGrouper"
+    }
+  }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/test-jira-anomalies-template.ftl b/thirdeye/thirdeye-pinot/src/test/resources/test-jira-anomalies-template.ftl
index 9453cc8..f12b9bb 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/test-jira-anomalies-template.ftl
+++ b/thirdeye/thirdeye-pinot/src/test/resources/test-jira-anomalies-template.ftl
@@ -8,8 +8,8 @@ ThirdEye has detected [*2 anomalies*|test/app/#/anomalies?anomalyIds=4,5] on the
 *Description:*
 
 ||Start||Duration||Type||Dimensions||Current||Predicted||Change||
-|[Jan 01, 10:05 PDT|test/app/#/rootcause?anomalyId=4]|24 hours   |Deviation   |key:value\\|_0_|_0_      |_+100.00 %_|
-|[Jan 01, 10:05 PDT|test/app/#/rootcause?anomalyId=5]|12.75 hours|Data Missing|-|_0_|_2_HOURS_|_+12 hours & 45mins_|
+|[Jan 01, 10:05 PDT|test/app/#/rootcause?anomalyId=4]|24 hours   |Deviation    |key:value\\|_0_|_0_      |_+100.00 %_|
+|[Jan 01, 10:05 PDT|test/app/#/rootcause?anomalyId=5]|12.75 hours|SLA Violation|-|_0_|_2_HOURS_|_+12 hours & 45mins_|
 
 =======================================================================================
 
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/test-metric-anomalies-template.html b/thirdeye/thirdeye-pinot/src/test/resources/test-metric-anomalies-template.html
index 9f66b1d..530b07b 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/test-metric-anomalies-template.html
+++ b/thirdeye/thirdeye-pinot/src/test/resources/test-metric-anomalies-template.html
@@ -90,7 +90,7 @@
                   <a style="font-weight: bold; text-decoration: none; font-size:14px; line-height:20px; color: #0073B1;" href="http://localhost:8080/dashboard/app/#/rootcause?anomalyId=6"
                      target="_blank">(view)</a>
                 </td>
-                <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">Data Missing</td>
+                <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">SLA Violation</td>
                 <td style="word-break: break-all; width: 200px; padding-right:4px 20px 4px 0"></td>
                 <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">0</td>
                 <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">
@@ -105,7 +105,7 @@
                   <a style="font-weight: bold; text-decoration: none; font-size:14px; line-height:20px; color: #0073B1;" href="http://localhost:8080/dashboard/app/#/rootcause?anomalyId=7"
                      target="_blank">(view)</a>
                 </td>
-                <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">Data Missing</td>
+                <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">SLA Violation</td>
                 <td style="word-break: break-all; width: 200px; padding-right:4px 20px 4px 0"></td>
                 <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">0</td>
                 <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">
@@ -121,7 +121,7 @@
                   <a style="font-weight: bold; text-decoration: none; font-size:14px; line-height:20px; color: #0073B1;" href="http://localhost:8080/dashboard/app/#/rootcause?anomalyId=8"
                      target="_blank">(view)</a>
                 </td>
-                <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">Data Missing</td>
+                <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">SLA Violation</td>
                 <td style="word-break: break-all; width: 200px; padding-right:4px 20px 4px 0"></td>
                 <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">0</td>
                 <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org