You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2020/05/22 01:24:40 UTC
[incubator-pinot] branch master updated: [TE] Added generic support
for configuring Data Quality rules like SLA from Yaml (#5398)
This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 598c12b [TE] Added generic support for configuring Data Quality rules like SLA from Yaml (#5398)
598c12b is described below
commit 598c12b69effddbab6c011ec2fb6f87f6e03b7b6
Author: Akshay Rai <ak...@linkedin.com>
AuthorDate: Thu May 21 18:24:27 2020 -0700
[TE] Added generic support for configuring Data Quality rules like SLA from Yaml (#5398)
**Overall Updates:**
* Added ability to configure Data Quality (DQ) alerts from Yaml. See a sample below.
* DQ pipeline leverages and extends on the existing detection merging logic pattern.
* Refactored code to modularize & reuse the detection translators and extend them to data quality translators.
* Updated the DATA_QUALITY Task and Pipeline Runner.
* Added following metrics to track - # DQ Tasks, # DQ Success Tasks & #DQ Fail Tasks
Note that this PR supports data quality checks only when detection is enabled. Due to limitations this PR doesn't address standalone data quality checks.
Doc Link: Available on the Alert Config page (internal)
**Tests:**
* Added a bunch of unit testing around translating DQ yaml to internal json representation
* Updated a bunch of unit testing to validate the Data SLA checker.
* Tested the Data Quality/sla pipeline by deploying it locally.
**Example:**
```
...
rules:
- detection:
...
quality:
- type: DATA_SLA
name: slaRule1
params:
sla: 1_DAYS
```
---
.../apache/pinot/thirdeye/anomaly/AnomalyType.java | 2 +-
.../trigger/DataAvailabilityTaskScheduler.java | 11 +-
.../pinot/thirdeye/anomaly/task/TaskConstants.java | 2 +-
.../thirdeye/anomaly/task/TaskInfoFactory.java | 2 +-
.../thirdeye/anomaly/task/TaskRunnerFactory.java | 6 +-
.../anomaly/utils/ThirdeyeMetricsUtil.java | 12 +-
.../thirdeye/constant/AnomalyResultSource.java | 1 +
.../datalayer/pojo/DetectionConfigBean.java | 12 +-
.../pinot/thirdeye/detection/DetectionUtils.java | 17 +-
.../alert/StatefulDetectionAlertFilter.java | 12 +-
.../DataQualityPipelineJob.java} | 20 +-
.../dataquality/DataQualityPipelineTaskRunner.java | 143 +++++
.../components/DataSlaQualityChecker.java | 216 +++++++
.../spec/DataSlaQualityCheckerSpec.java} | 35 +-
.../dataquality/wrapper/DataSlaWrapper.java | 103 ++++
.../detection/datasla/DatasetSlaTaskRunner.java | 320 ----------
.../validators/DetectionConfigValidator.java | 1 +
.../wrapper/BaselineFillingMergeWrapper.java | 2 +
.../wrapper/ChildKeepingMergeWrapper.java | 6 +-
.../detection/wrapper/DataQualityMergeWrapper.java | 99 ++++
.../yaml/translator/DetectionConfigTranslator.java | 398 +------------
.../translator/DetectionMetricAttributeHolder.java | 29 +-
.../builder/DataQualityPropertiesBuilder.java | 139 +++++
.../builder/DetectionConfigPropertiesBuilder.java | 222 +++++++
.../builder/DetectionPropertiesBuilder.java | 243 ++++++++
.../formatter/DetectionConfigFormatter.java | 8 +-
.../content/BaseNotificationContent.java | 8 +-
.../thirdeye/scheduler/DetectionCronScheduler.java | 71 ++-
.../scheduler/SubscriptionCronScheduler.java | 19 +-
.../thirdeye/scheduler/ThirdEyeCronScheduler.java | 6 +-
.../trigger/DataAvailabilityTaskSchedulerTest.java | 4 +-
.../pinot/thirdeye/detection/MockDataProvider.java | 13 +
.../dataquality/DataQualityTaskRunnerTest.java | 642 +++++++++++++++++++++
.../datasla/DatasetSlaTaskRunnerTest.java | 478 ---------------
.../DetectionConfigSlaTranslatorTest.java | 121 ++++
.../yaml/translator/YamlTranslationResult.java | 9 +
.../templates/TestMetricAnomaliesContent.java | 10 +-
.../channels/TestJiraContentFormatter.java | 4 +-
.../tools/RunAdhocDatabaseQueriesTool.java | 4 +-
.../detection/dataquality/sla-config-0.yaml | 17 +
.../detection/dataquality/sla-config-1.yaml | 17 +
.../detection/dataquality/sla-config-2.yaml | 17 +
.../detection/dataquality/sla-config-3.yaml | 17 +
.../detection/yaml/translator/sla-config-1.yaml | 17 +
.../detection/yaml/translator/sla-config-2.yaml | 30 +
.../detection/yaml/translator/sla-config-3.yaml | 65 +++
.../detection/yaml/translator/sla-config-4.yaml | 33 ++
.../detection/yaml/translator/sla-config-5.yaml | 90 +++
.../yaml/translator/sla-config-translated-1.json | 23 +
.../yaml/translator/sla-config-translated-2.json | 23 +
.../yaml/translator/sla-config-translated-3.json | 57 ++
.../yaml/translator/sla-config-translated-4.json | 26 +
.../yaml/translator/sla-config-translated-5.json | 103 ++++
.../resources/test-jira-anomalies-template.ftl | 4 +-
.../resources/test-metric-anomalies-template.html | 6 +-
55 files changed, 2692 insertions(+), 1303 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/AnomalyType.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/AnomalyType.java
index 364f443..5fa2c2f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/AnomalyType.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/AnomalyType.java
@@ -27,7 +27,7 @@ public enum AnomalyType {
// There is a trend change for underline metric.
TREND_CHANGE ("Trend Change"),
// The metric is not available within specified time.
- DATA_MISSING ("Data Missing");
+ DATA_SLA ("SLA Violation");
private String label;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
index c23e40e..d05bfa8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
@@ -122,9 +122,10 @@ public class DataAvailabilityTaskScheduler implements Runnable {
long endtime = System.currentTimeMillis();
createDetectionTask(detectionConfig, endtime);
- if (DetectionUtils.isDataAvailabilityCheckEnabled(detectionConfig)) {
- createDataSLACheckTask(detectionConfig, endtime);
- LOG.info("Scheduling a task for data availability {} due to the fallback mechanism.", detectionConfigId);
+ if (DetectionUtils.isDataQualityCheckEnabled(detectionConfig)) {
+ createDataQualityTask(detectionConfig, endtime);
+ LOG.info("Scheduling a task for data sla check on detection config {} due to the fallback mechanism.",
+ detectionConfigId);
}
detectionIdToLastTaskEndTimeMap.put(detectionConfig.getId(), endtime);
@@ -213,8 +214,8 @@ public class DataAvailabilityTaskScheduler implements Runnable {
return createTask(TaskConstants.TaskType.DETECTION, detectionConfig, end);
}
- private long createDataSLACheckTask(DetectionConfigDTO detectionConfig, long end) throws JsonProcessingException {
- return createTask(TaskConstants.TaskType.DATA_SLA, detectionConfig, end);
+ private long createDataQualityTask(DetectionConfigDTO detectionConfig, long end) throws JsonProcessingException {
+ return createTask(TaskConstants.TaskType.DATA_QUALITY, detectionConfig, end);
}
private void loadLatestTaskCreateTime(DetectionConfigDTO detectionConfig) throws Exception {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
index 206c8bb..728c1b8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
@@ -22,7 +22,7 @@ package org.apache.pinot.thirdeye.anomaly.task;
public class TaskConstants {
public enum TaskType {
- DATA_SLA,
+ DATA_QUALITY,
DETECTION,
DETECTION_ALERT,
YAML_DETECTION_ONBOARD,
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskInfoFactory.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskInfoFactory.java
index 6a3a19a..08dd300 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskInfoFactory.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskInfoFactory.java
@@ -46,7 +46,7 @@ public class TaskInfoFactory {
TaskInfo taskInfo = null;
try {
switch(taskType) {
- case DATA_SLA:
+ case DATA_QUALITY:
taskInfo = OBJECT_MAPPER.readValue(taskInfoString, DetectionPipelineTaskInfo.class);
break;
case DETECTION:
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskRunnerFactory.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskRunnerFactory.java
index fe160f7..4f2012b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskRunnerFactory.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskRunnerFactory.java
@@ -27,7 +27,7 @@ import org.apache.pinot.thirdeye.anomaly.task.TaskConstants.TaskType;
import org.apache.pinot.thirdeye.completeness.checker.DataCompletenessTaskRunner;
import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskRunner;
import org.apache.pinot.thirdeye.detection.alert.DetectionAlertTaskRunner;
-import org.apache.pinot.thirdeye.detection.datasla.DatasetSlaTaskRunner;
+import org.apache.pinot.thirdeye.detection.dataquality.DataQualityPipelineTaskRunner;
import org.apache.pinot.thirdeye.detection.onboard.YamlOnboardingTaskRunner;
@@ -39,8 +39,8 @@ public class TaskRunnerFactory {
public static TaskRunner getTaskRunnerFromTaskType(TaskType taskType) {
TaskRunner taskRunner = null;
switch (taskType) {
- case DATA_SLA:
- taskRunner = new DatasetSlaTaskRunner();
+ case DATA_QUALITY:
+ taskRunner = new DataQualityPipelineTaskRunner();
break;
case DETECTION:
taskRunner = new DetectionPipelineTaskRunner();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
index d6de6c2..cffdd26 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/utils/ThirdeyeMetricsUtil.java
@@ -62,15 +62,21 @@ public class ThirdeyeMetricsUtil {
public static final Counter detectionTaskCounter =
metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "detectionTaskCounter");
- public static final Counter dataAvailabilityTaskCounter =
- metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "dataAvailabilityTaskCounter");
-
public static final Counter detectionTaskSuccessCounter =
metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "detectionTaskSuccessCounter");
public static final Counter detectionTaskExceptionCounter =
metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "detectionTaskExceptionCounter");
+ public static final Counter dataQualityTaskCounter =
+ metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "dataQualityTaskCounter");
+
+ public static final Counter dataQualityTaskSuccessCounter =
+ metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "dataQualityTaskSuccessCounter");
+
+ public static final Counter dataQualityTaskExceptionCounter =
+ metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "dataQualityTaskExceptionCounter");
+
public static final Counter alertTaskCounter =
metricsRegistry.newCounter(ThirdeyeMetricsUtil.class, "alertTaskCounter");
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyResultSource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyResultSource.java
index fa41686..6ce3a9d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyResultSource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyResultSource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.thirdeye.constant;
public enum AnomalyResultSource {
DEFAULT_ANOMALY_DETECTION,
+ DATA_QUALITY_DETECTION,
ANOMALY_REPLAY,
USER_LABELED_ANOMALY
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/DetectionConfigBean.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/DetectionConfigBean.java
index bb587a0..0a0a700 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/DetectionConfigBean.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/DetectionConfigBean.java
@@ -47,7 +47,7 @@ public class DetectionConfigBean extends AbstractBean {
List<String> owners;
// Stores properties related to data SLA rules for every metric
- Map<String, Object> dataSLAProperties;
+ Map<String, Object> dataQualityProperties;
boolean isDataAvailabilitySchedule;
long taskTriggerFallBackTimeInSec;
@@ -133,12 +133,12 @@ public class DetectionConfigBean extends AbstractBean {
this.active = active;
}
- public Map<String, Object> getDataSLAProperties() {
- return dataSLAProperties;
+ public Map<String, Object> getDataQualityProperties() {
+ return dataQualityProperties;
}
- public void setDataSLAProperties(Map<String, Object> dataSLAProperties) {
- this.dataSLAProperties = dataSLAProperties;
+ public void setDataQualityProperties(Map<String, Object> dataQualityProperties) {
+ this.dataQualityProperties = dataQualityProperties;
}
public boolean isDataAvailabilitySchedule() {
@@ -176,7 +176,7 @@ public class DetectionConfigBean extends AbstractBean {
DetectionConfigBean that = (DetectionConfigBean) o;
return lastTimestamp == that.lastTimestamp && active == that.active && Objects.equals(cron, that.cron)
&& Objects.equals(name, that.name) && Objects.equals(properties, that.properties) && Objects.equals(yaml,
- that.yaml) && Objects.equals(dataSLAProperties, that.dataSLAProperties)
+ that.yaml) && Objects.equals(dataQualityProperties, that.dataQualityProperties)
&& Objects.equals(isDataAvailabilitySchedule, that.isDataAvailabilitySchedule) && Objects
.equals(taskTriggerFallBackTimeInSec, that.taskTriggerFallBackTimeInSec);
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
index d993e28..4352d1a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
@@ -166,10 +166,13 @@ public class DetectionUtils {
* @return anomaly template
*/
public static MergedAnomalyResultDTO makeAnomaly(MetricSlice slice) {
- MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
- anomaly.setStartTime(slice.getStart());
- anomaly.setEndTime(slice.getEnd());
+ return makeAnomaly(slice.getStart(), slice.getEnd());
+ }
+ public static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
+ MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+ anomaly.setStartTime(start);
+ anomaly.setEndTime(end);
return anomaly;
}
@@ -344,11 +347,11 @@ public class DetectionUtils {
}
/**
- * Verify if this detection has data availability checks enabled
+ * Verify if this detection has data quality checks enabled
*/
- public static boolean isDataAvailabilityCheckEnabled(DetectionConfigDTO detectionConfig) {
- return detectionConfig.getDataSLAProperties() != null
- && !detectionConfig.getDataSLAProperties().isEmpty();
+ public static boolean isDataQualityCheckEnabled(DetectionConfigDTO detectionConfig) {
+ return detectionConfig.getDataQualityProperties() != null
+ && !detectionConfig.getDataQualityProperties().isEmpty();
}
public static long makeTimeout(long deadline) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/StatefulDetectionAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
index b099677..4437c0f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
@@ -78,12 +78,12 @@ public abstract class StatefulDetectionAlertFilter extends DetectionAlertFilter
Collection<MergedAnomalyResultDTO> anomalies =
Collections2.filter(candidates, new Predicate<MergedAnomalyResultDTO>() {
@Override
- public boolean apply(@Nullable MergedAnomalyResultDTO mergedAnomalyResultDTO) {
- return mergedAnomalyResultDTO != null
- && !mergedAnomalyResultDTO.isChild()
- && !AlertUtils.hasFeedback(mergedAnomalyResultDTO)
- && mergedAnomalyResultDTO.getCreatedTime() > finalStartTime
- && mergedAnomalyResultDTO.getAnomalyResultSource().equals(AnomalyResultSource.DEFAULT_ANOMALY_DETECTION);
+ public boolean apply(@Nullable MergedAnomalyResultDTO anomaly) {
+ return anomaly != null && !anomaly.isChild()
+ && !AlertUtils.hasFeedback(anomaly)
+ && anomaly.getCreatedTime() > finalStartTime
+ && (anomaly.getAnomalyResultSource().equals(AnomalyResultSource.DEFAULT_ANOMALY_DETECTION) ||
+ anomaly.getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION));
}
});
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionDataSLAJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
similarity index 76%
rename from thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionDataSLAJob.java
rename to thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
index b63a412..13a2989 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionDataSLAJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.pinot.thirdeye.detection;
+package org.apache.pinot.thirdeye.detection.dataquality;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -26,14 +26,20 @@ import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.TaskUtils;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DetectionDataSLAJob implements Job {
- private static final Logger LOG = LoggerFactory.getLogger(DetectionDataSLAJob.class);
+/**
+ * The data quality job submitted to the scheduler. This job creates data quality tasks which
+ * the runners will later pick and execute.
+ */
+public class DataQualityPipelineJob implements Job {
+ private static final Logger LOG = LoggerFactory.getLogger(DataQualityPipelineJob.class);
private final TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();
@@ -45,10 +51,10 @@ public class DetectionDataSLAJob implements Job {
DetectionPipelineTaskInfo taskInfo = TaskUtils.buildTaskInfo(jobExecutionContext);
// if a task is pending and not time out yet, don't schedule more
- String jobName = String.format("%s_%d", TaskConstants.TaskType.DATA_SLA, taskInfo.configId);
+ String jobName = String.format("%s_%d", TaskConstants.TaskType.DATA_QUALITY, taskInfo.getConfigId());
if (TaskUtils.checkTaskAlreadyRun(jobName, taskInfo, DATA_AVAILABILITY_TASK_TIMEOUT)) {
LOG.info("Skip scheduling {} task for {} with start time {}. Task is already in the queue.",
- TaskConstants.TaskType.DATA_SLA, jobName, taskInfo.getStart());
+ TaskConstants.TaskType.DATA_QUALITY, jobName, taskInfo.getStart());
return;
}
@@ -59,9 +65,9 @@ public class DetectionDataSLAJob implements Job {
LOG.error("Exception when converting DetectionPipelineTaskInfo {} to jsonString", taskInfo, e);
}
- TaskDTO taskDTO = TaskUtils.buildTask(taskInfo.configId, taskInfoJson, TaskConstants.TaskType.DATA_SLA);
+ TaskDTO taskDTO = TaskUtils.buildTask(taskInfo.getConfigId(), taskInfoJson, TaskConstants.TaskType.DATA_QUALITY);
long taskId = taskDAO.save(taskDTO);
- LOG.info("Created {} task {} with taskId {}", TaskConstants.TaskType.DATA_SLA, taskDTO, taskId);
+ LOG.info("Created {} task {} with taskId {}", TaskConstants.TaskType.DATA_QUALITY, taskDTO, taskId);
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineTaskRunner.java
new file mode 100644
index 0000000..80081bf
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineTaskRunner.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.thirdeye.anomaly.task.TaskContext;
+import org.apache.pinot.thirdeye.anomaly.task.TaskInfo;
+import org.apache.pinot.thirdeye.anomaly.task.TaskResult;
+import org.apache.pinot.thirdeye.anomaly.task.TaskRunner;
+import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DefaultDataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Ths class is responsible for running the data quality tasks
+ */
+public class DataQualityPipelineTaskRunner implements TaskRunner {
+ private static final Logger LOG = LoggerFactory.getLogger(DataQualityPipelineTaskRunner.class);
+ private final DetectionConfigManager detectionDAO;
+ private final MergedAnomalyResultManager anomalyDAO;
+ private final EvaluationManager evaluationDAO;
+ private final DetectionPipelineLoader loader;
+ private final DataProvider provider;
+
+ /**
+ * Default constructor for ThirdEye task execution framework.
+ * Loads dependencies from DAORegitry and CacheRegistry
+ *
+ * @see DAORegistry
+ * @see ThirdEyeCacheRegistry
+ */
+ public DataQualityPipelineTaskRunner() {
+ this.loader = new DetectionPipelineLoader();
+ this.detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
+ this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+ this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+ MetricConfigManager metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
+ DatasetConfigManager datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+ EventManager eventDAO = DAORegistry.getInstance().getEventDAO();
+
+ TimeSeriesLoader timeseriesLoader =
+ new DefaultTimeSeriesLoader(metricDAO, datasetDAO,
+ ThirdEyeCacheRegistry.getInstance().getQueryCache(), ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+ AggregationLoader aggregationLoader =
+ new DefaultAggregationLoader(metricDAO, datasetDAO,
+ ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+ ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+ this.provider = new DefaultDataProvider(metricDAO, datasetDAO, eventDAO, this.anomalyDAO, this.evaluationDAO,
+ timeseriesLoader, aggregationLoader, new DetectionPipelineLoader(), TimeSeriesCacheBuilder.getInstance(),
+ AnomaliesCacheBuilder.getInstance());
+ }
+
+ public DataQualityPipelineTaskRunner(DetectionConfigManager detectionDAO, MergedAnomalyResultManager anomalyDAO,
+ EvaluationManager evaluationDAO, DetectionPipelineLoader loader, DataProvider provider) {
+ this.detectionDAO = detectionDAO;
+ this.anomalyDAO = anomalyDAO;
+ this.evaluationDAO = evaluationDAO;
+ this.loader = loader;
+ this.provider = provider;
+ }
+
+ @Override
+ public List<TaskResult> execute(TaskInfo taskInfo, TaskContext taskContext) throws Exception {
+ ThirdeyeMetricsUtil.dataQualityTaskCounter.inc();
+
+ try {
+ DetectionPipelineTaskInfo info = (DetectionPipelineTaskInfo) taskInfo;
+ DetectionConfigDTO config = this.detectionDAO.findById(info.getConfigId());
+ if (config == null) {
+ throw new IllegalArgumentException(String.format("Could not resolve config id %d", info.getConfigId()));
+ }
+
+ LOG.info("Start data quality check for config {} between {} and {}", config.getId(), info.getStart(), info.getEnd());
+ Map<String, Object> props = config.getProperties();
+ // A small hack to reuse the properties field to run the data quality pipeline; this is reverted after the run.
+ config.setProperties(config.getDataQualityProperties());
+ DetectionPipeline pipeline = this.loader.from(this.provider, config, info.getStart(), info.getEnd());
+ DetectionPipelineResult result = pipeline.run();
+ // revert the properties field back to detection properties
+ config.setProperties(props);
+
+ // Save all the data quality anomalies
+ for (MergedAnomalyResultDTO mergedAnomalyResultDTO : result.getAnomalies()) {
+ this.anomalyDAO.save(mergedAnomalyResultDTO);
+ if (mergedAnomalyResultDTO.getId() == null) {
+ LOG.warn("Could not store anomaly:\n{}", mergedAnomalyResultDTO);
+ }
+ }
+
+ ThirdeyeMetricsUtil.dataQualityTaskSuccessCounter.inc();
+ LOG.info("End data quality check for config {} between {} and {}. Detected {} anomalies.", config.getId(),
+ info.getStart(), info.getEnd(), result.getAnomalies());
+ return Collections.emptyList();
+ } catch(Exception e) {
+ ThirdeyeMetricsUtil.dataQualityTaskExceptionCounter.inc();
+ throw e;
+ }
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
new file mode 100644
index 0000000..42d6860
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.components;
+
+import com.google.common.collect.ArrayListMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.PresentationOption;
+import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs data sla checks for the window and generates DATA_SLA anomalies.
+ *
+ * Data SLA is verified based on the following information.
+ * a. The dataset refresh timestamp updated by the event based data availability pipeline (if applicable).
+ * b. Otherwise, we will query the data source and run sla checks.
+ */
+@Components(title = "Data Sla Quality Checker",
+ type = "DATA_SLA",
+ tags = {DetectionTag.RULE_DETECTION},
+ description = "Checks if data is missing or not based on the configured sla",
+ presentation = {
+ @PresentationOption(name = "data sla", template = "is ${sla}")
+ },
+ params = {
+ @Param(name = "sla", placeholder = "value")
+ })
+public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> {
+ private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class);
+
+ private String sla;
+ private InputDataFetcher dataFetcher;
+ private final String DEFAULT_DATA_SLA = "3_DAYS";
+
+ @Override
+ public DetectionResult runDetection(Interval window, String metricUrn) {
+ return DetectionResult.from(runSLACheck(MetricEntity.fromURN(metricUrn), window));
+ }
+
+ @Override
+ public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+ return TimeSeries.empty();
+ }
+
+ @Override
+ public void init(DataSlaQualityCheckerSpec spec, InputDataFetcher dataFetcher) {
+ this.sla = spec.getSla();
+ this.dataFetcher = dataFetcher;
+ }
+
+ /**
+ * Runs the data sla check for the window on the given metric.
+ */
+ private List<MergedAnomalyResultDTO> runSLACheck(MetricEntity me, Interval window) {
+ List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+ long expectedDatasetRefreshTime = window.getEnd().getMillis();
+ InputData data = this.dataFetcher.fetchData(new InputDataSpec()
+ .withMetricIdsForDataset(Collections.singletonList(me.getId()))
+ .withMetricIds(Collections.singletonList(me.getId())));
+ DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+
+ try {
+ long datasetLastRefreshTime = fetchLatestDatasetRefreshTime(data, me, window);
+ MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, expectedDatasetRefreshTime);
+
+ if (isSLAViolated(datasetLastRefreshTime, expectedDatasetRefreshTime)) {
+ anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+ }
+ } catch (Exception e) {
+ LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e);
+ }
+
+ return anomalies;
+ }
+
+ /**
+ * Fetches the latest timestamp for the dataset. It relies on 2 sources:
+ * a. Data-trigger/availability based refresh timestamp
+ * b. If not available, check by directly querying the data-source.
+ */
+ private long fetchLatestDatasetRefreshTime(InputData data, MetricEntity me, Interval window) {
+ long startTime = window.getStart().getMillis();
+ long endTime = window.getEnd().getMillis();
+ // Note that we only measure the overall dataset availability. Filters are not considered as the
+ // data availability events fire at the dataset level.
+ MetricSlice metricSlice = MetricSlice.from(me.getId(), startTime, endTime, ArrayListMultimap.create());
+
+ // Fetch dataset refresh time based on the data availability events
+ DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+ long eventBasedRefreshTime = datasetConfig.getLastRefreshTime();
+ if (eventBasedRefreshTime <= 0) {
+ // no availability event -> assume we have processed data till the current detection start
+ eventBasedRefreshTime = startTime - 1;
+ }
+
+ // If the data availability event indicates no data or partial data, we will confirm with the data source.
+ if (eventBasedRefreshTime < startTime || isPartialData(eventBasedRefreshTime, endTime, datasetConfig)) {
+ // Double check with data source. This can happen if,
+ // 1. This dataset/source doesn't not support data trigger/availability events
+ // 2. The data trigger event didn't arrive due to some upstream issue.
+ DataFrame dataFrame = this.dataFetcher.fetchData(new InputDataSpec()
+ .withTimeseriesSlices(Collections.singletonList(metricSlice))).getTimeseries().get(metricSlice);
+ if (dataFrame != null && !dataFrame.isEmpty()) {
+ return dataFrame.getDoubles("timestamp").max().longValue();
+ }
+ }
+
+ return eventBasedRefreshTime;
+ }
+
+ /**
+ * We say the data is partial if we do not have all the data points in the sla detection window.
+ * Or more specifically if we have at least 1 data-point missing in the sla detection window.
+ *
+ * For example:
+ * Assume that the data is delayed and our current sla detection window is [1st Feb to 3rd Feb). During this scan,
+ * let's say data for 1st Feb arrives. Now, we have a situation where partial data is present. In other words, in the
+ * current sla detection window [1st to 3rd Feb) we have data for 1st Feb but data for 2nd Feb is missing.
+ *
+ * Based on this information, we can smartly decide if we need to query the data source or not.
+ */
+ private boolean isPartialData(long actual, long expected, DatasetConfigDTO datasetConfig) {
+ long granularity = datasetConfig.bucketTimeGranularity().toMillis();
+ return (expected - actual) * 1.0 / granularity > 1;
+ }
+
+ /**
+ * Validates if the data is delayed or not based on the user specified SLA configuration
+ *
+ * fetch the user configured SLA, otherwise default 3_DAYS.
+ */
+ private boolean isSLAViolated(long actualRefreshTime, long expectedRefreshTime) {
+ String sla = StringUtils.isNotEmpty(this.sla) ? this.sla : DEFAULT_DATA_SLA;
+ long delay = TimeGranularity.fromString(sla).toPeriod().toStandardDuration().getMillis();
+
+ return (expectedRefreshTime - actualRefreshTime) > delay;
+ }
+
+ /**
+ * Align and round off start time to the upper boundary of the granularity
+ */
+ private static long alignToUpperBoundary(long timestamp, DatasetConfigDTO datasetConfig) {
+ Period granularityPeriod = datasetConfig.bucketTimeGranularity().toPeriod();
+ DateTimeZone timezone = DateTimeZone.forID(datasetConfig.getTimezone());
+ DateTime startTime = new DateTime(timestamp - 1, timezone).plus(granularityPeriod);
+ return (startTime.getMillis() / granularityPeriod.toStandardDuration().getMillis()) * granularityPeriod.toStandardDuration().getMillis();
+ }
+
+ /**
+ * Creates a DATA_SLA anomaly from ceiling(start) to ceiling(end) for the detection id.
+ * If existing DATA_SLA anomalies are present, then it will be merged accordingly.
+ */
+ private MergedAnomalyResultDTO createDataSLAAnomaly(MetricSlice slice, DatasetConfigDTO datasetConfig) {
+ MergedAnomalyResultDTO anomaly = DetectionUtils.makeAnomaly(
+ alignToUpperBoundary(slice.getStart(), datasetConfig),
+ alignToUpperBoundary(slice.getEnd(), datasetConfig));
+ anomaly.setCollection(datasetConfig.getName());
+ anomaly.setType(AnomalyType.DATA_SLA);
+ anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
+
+ // Store the metadata in the anomaly
+ Map<String, String> properties = new HashMap<>();
+ properties.put("datasetLastRefreshTime", String.valueOf(slice.getStart() - 1));
+ properties.put("sla", sla);
+ anomaly.setProperties(properties);
+
+ return anomaly;
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
similarity index 62%
copy from thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
copy to thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
index 206c8bb..edfbc19 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskConstants.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
@@ -17,31 +17,22 @@
* under the License.
*/
-package org.apache.pinot.thirdeye.anomaly.task;
+package org.apache.pinot.thirdeye.detection.dataquality.spec;
-public class TaskConstants {
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.detection.spec.AbstractSpec;
- public enum TaskType {
- DATA_SLA,
- DETECTION,
- DETECTION_ALERT,
- YAML_DETECTION_ONBOARD,
- ANOMALY_DETECTION,
- MERGE,
- // TODO: deprecate ALERT task type
- ALERT,
- ALERT2,
- MONITOR,
- DATA_COMPLETENESS,
- CLASSIFICATION,
- REPLAY
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DataSlaQualityCheckerSpec extends AbstractSpec {
+ private String sla = "3_DAYS";
+
+ public String getSla() {
+ return sla;
}
- public enum TaskStatus {
- WAITING,
- RUNNING,
- COMPLETED,
- FAILED,
- TIMEOUT
+ public void setSla(String sla) {
+ this.sla = sla;
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/wrapper/DataSlaWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/wrapper/DataSlaWrapper.java
new file mode 100644
index 0000000..b0cb0e7
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/wrapper/DataSlaWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.wrapper;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator.*;
+
+
+/**
+ * Wrapper class that is responsible for running the @{link DataSlaQualityChecker}
+ */
+public class DataSlaWrapper extends DetectionPipeline {
+ private static final Logger LOG = LoggerFactory.getLogger(DataSlaWrapper.class);
+
+ private static final String PROP_METRIC_URN = "metricUrn";
+ private static final String PROP_QUALITY_CHECK = "qualityCheck";
+ private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
+
+ private final AnomalyDetector qualityChecker;
+ private final DatasetConfigDTO dataset;
+ private final MetricEntity metricEntity;
+ private final MetricConfigDTO metric;
+ private final String qualityCheckerKey;
+ private final String entityName;
+ private final String metricUrn;
+
+ public DataSlaWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
+ super(provider, config, startTime, endTime);
+
+ Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_SUB_ENTITY_NAME));
+ this.entityName = MapUtils.getString(config.getProperties(), PROP_SUB_ENTITY_NAME);
+
+ Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_QUALITY_CHECK));
+ this.qualityCheckerKey = DetectionUtils.getComponentKey(MapUtils.getString(config.getProperties(), PROP_QUALITY_CHECK));
+ Preconditions.checkArgument(this.config.getComponents().containsKey(this.qualityCheckerKey));
+ this.qualityChecker = (AnomalyDetector) this.config.getComponents().get(this.qualityCheckerKey);
+
+ this.metricUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
+ this.metricEntity = MetricEntity.fromURN(this.metricUrn);
+ this.metric = provider.fetchMetrics(Collections.singleton(this.metricEntity.getId())).get(this.metricEntity.getId());
+
+ MetricConfigDTO metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(this.metricEntity.getId())).get(this.metricEntity.getId());
+ this.dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
+ .get(metricConfigDTO.getDataset());
+ }
+
+ @Override
+ public DetectionPipelineResult run() throws Exception {
+ LOG.info("Check data sla for config {} between {} and {}", config.getId(), startTime, endTime);
+ Interval window = new Interval(startTime, endTime, DateTimeZone.forID(dataset.getTimezone()));
+ DetectionResult detectionResult = qualityChecker.runDetection(window, this.metricUrn);
+ List<MergedAnomalyResultDTO> anomalies = new ArrayList<>(detectionResult.getAnomalies());
+
+ for (MergedAnomalyResultDTO anomaly : anomalies) {
+ anomaly.setDetectionConfigId(this.config.getId());
+ anomaly.setMetricUrn(this.metricUrn);
+ anomaly.setMetric(this.metric.getName());
+ anomaly.setCollection(this.metric.getDataset());
+ anomaly.setDimensions(DetectionUtils.toFilterMap(this.metricEntity.getFilters()));
+ anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, this.qualityCheckerKey);
+ anomaly.getProperties().put(PROP_SUB_ENTITY_NAME, this.entityName);
+ }
+
+ return new DetectionPipelineResult(anomalies);
+ }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunner.java
deleted file mode 100644
index 359a295..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunner.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.datasla;
-
-import com.google.common.collect.ArrayListMultimap;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.pinot.thirdeye.anomaly.AnomalyType;
-import org.apache.pinot.thirdeye.anomaly.task.TaskContext;
-import org.apache.pinot.thirdeye.anomaly.task.TaskInfo;
-import org.apache.pinot.thirdeye.anomaly.task.TaskResult;
-import org.apache.pinot.thirdeye.anomaly.task.TaskRunner;
-import org.apache.pinot.thirdeye.common.time.TimeGranularity;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
-import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
-import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.pojo.MergedAnomalyResultBean;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
-import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
-import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
-import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
-import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
-import org.apache.pinot.thirdeye.detection.ConfigUtils;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DefaultDataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
-import org.apache.pinot.thirdeye.detection.DetectionUtils;
-import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
-import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil.*;
-
-
-/**
- * Runner that generates Data SLA anomalies. DATA_MISSING anomalies are created if
- * the data is not available for the sla detection window within the configured SLA.
- */
-public class DatasetSlaTaskRunner implements TaskRunner {
- private static final Logger LOG = LoggerFactory.getLogger(DatasetSlaTaskRunner.class);
-
- private final DetectionConfigManager detectionDAO;
- private final MergedAnomalyResultManager anomalyDAO;
- private final EvaluationManager evaluationDAO;
- private DataProvider provider;
-
- private final String DEFAULT_DATA_SLA = "3_DAYS";
-
- public DatasetSlaTaskRunner() {
- this.detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
- this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
- this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
-
- MetricConfigManager metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
- DatasetConfigManager datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
- EventManager eventDAO = DAORegistry.getInstance().getEventDAO();
-
- TimeSeriesLoader timeseriesLoader =
- new DefaultTimeSeriesLoader(metricDAO, datasetDAO,
- ThirdEyeCacheRegistry.getInstance().getQueryCache(), ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
-
- AggregationLoader aggregationLoader =
- new DefaultAggregationLoader(metricDAO, datasetDAO,
- ThirdEyeCacheRegistry.getInstance().getQueryCache(),
- ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
-
- this.provider = new DefaultDataProvider(metricDAO, datasetDAO, eventDAO, this.anomalyDAO, this.evaluationDAO,
- timeseriesLoader, aggregationLoader, new DetectionPipelineLoader(), TimeSeriesCacheBuilder.getInstance(),
- AnomaliesCacheBuilder.getInstance());
- }
-
- public DatasetSlaTaskRunner(DetectionConfigManager detectionDAO, MergedAnomalyResultManager anomalyDAO,
- EvaluationManager evaluationDAO, DataProvider provider) {
- this.detectionDAO = detectionDAO;
- this.anomalyDAO = anomalyDAO;
- this.evaluationDAO = evaluationDAO;
- this.provider = provider;
- }
-
- @Override
- public List<TaskResult> execute(TaskInfo taskInfo, TaskContext taskContext) throws Exception {
- dataAvailabilityTaskCounter.inc();
-
- try {
- DetectionPipelineTaskInfo info = (DetectionPipelineTaskInfo) taskInfo;
-
- DetectionConfigDTO config = this.detectionDAO.findById(info.getConfigId());
- if (config == null) {
- throw new IllegalArgumentException(String.format("Could not resolve config id %d", info.getConfigId()));
- }
-
- LOG.info("Check data sla for config {} between {} and {}", config.getId(), info.getStart(), info.getEnd());
- Map<String, Object> metricUrnToSlaMap = config.getDataSLAProperties();
- if (MapUtils.isNotEmpty(metricUrnToSlaMap)) {
- for (Map.Entry<String, Object> metricUrnToSlaMapEntry : metricUrnToSlaMap.entrySet()) {
- MetricEntity me = MetricEntity.fromURN(metricUrnToSlaMapEntry.getKey());
- MetricConfigDTO
- metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
- Map<String, DatasetConfigDTO> datasetToConfigMap = provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()));
- for (Map.Entry<String, DatasetConfigDTO> datasetSLA : datasetToConfigMap.entrySet()) {
- try {
- runSLACheck(me, datasetSLA.getValue(), info, ConfigUtils.getMap(metricUrnToSlaMapEntry.getValue()));
- } catch (Exception e) {
- LOG.error("Failed to run sla check on metric URN %s", datasetSLA.getKey(), e);
- }
- }
- }
- }
-
- //LOG.info("End data availability for config {} between {} and {}. Detected {} anomalies.", config.getId(),
- // info.getStart(), info.getEnd(), anomaliesList.size());
- return Collections.emptyList();
- } catch(Exception e) {
- throw e;
- }
- }
-
- /**
- * Runs the data sla check for the window (info) on the given metric (me) using the configured sla properties
- */
- private void runSLACheck(MetricEntity me, DatasetConfigDTO datasetConfig, DetectionPipelineTaskInfo info,
- Map<String, Object> slaProps) {
- if (me == null || datasetConfig == null || info == null) {
- //nothing to check
- return;
- }
-
- try {
- long datasetLastRefreshTime = datasetConfig.getLastRefreshTime();
- if (datasetLastRefreshTime <= 0) {
- // assume we have processed data till the current detection start
- datasetLastRefreshTime = info.getStart() - 1;
- }
-
- if (isMissingData(datasetLastRefreshTime, info)) {
- // Double check with data source as 2 things are possible.
- // 1. This dataset/source may not support availability events
- // 2. The data availability event pipeline has some issue.
- // We want to measure the overall dataset availability (filters are not taken into account)
- MetricSlice metricSlice = MetricSlice.from(me.getId(), info.getStart(), info.getEnd(), ArrayListMultimap.<String, String>create());
- DataFrame dataFrame = this.provider.fetchTimeseries(Collections.singleton(metricSlice)).get(metricSlice);
- if (dataFrame == null || dataFrame.isEmpty()) {
- // no data
- if (hasMissedSLA(datasetLastRefreshTime, slaProps, info.getEnd())) {
- createDataSLAAnomaly(datasetLastRefreshTime + 1, info.getEnd(), info.getConfigId(), datasetConfig, slaProps);
- }
- } else {
- datasetLastRefreshTime = dataFrame.getDoubles("timestamp").max().longValue();
- if (isPartialData(datasetLastRefreshTime, info, datasetConfig)) {
- if (hasMissedSLA(datasetLastRefreshTime, slaProps, info.getEnd())) {
- createDataSLAAnomaly(datasetLastRefreshTime + 1, info.getEnd(), info.getConfigId(), datasetConfig, slaProps);
- }
- }
- }
- } else if (isPartialData(datasetLastRefreshTime, info, datasetConfig)) {
- // Optimize for the common case - the common case is that the data availability events are arriving
- // correctly and we need not re-fetch the data to double check.
- if (hasMissedSLA(datasetLastRefreshTime, slaProps, info.getEnd())) {
- createDataSLAAnomaly(datasetLastRefreshTime + 1, info.getEnd(), info.getConfigId(), datasetConfig, slaProps);
- }
- }
- } catch (Exception e) {
- LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e);
- }
- }
-
- /**
- * We say the data is missing we do not have data in the sla detection window.
- * Or more specifically if the dataset watermark is below the sla detection window.
- */
- private boolean isMissingData(long datasetLastRefreshTime, DetectionPipelineTaskInfo info) {
- return datasetLastRefreshTime < info.getStart();
- }
-
- /**
- * We say the data is partial if we do not have all the data points in the sla detection window.
- * Or more specifically if we have at least 1 data-point missing in the sla detection window.
- *
- * For example:
- * Assume that the data is delayed and our current sla detection window is [1st Feb to 3rd Feb). During this scan,
- * let's say data for 1st Feb arrives. Now, we have a situation where partial data is present. In other words, in the
- * current sla detection window [1st to 3rd Feb) we have data for 1st Feb but data for 2nd Feb is missing. This is the
- * partial data scenario.
- */
- private boolean isPartialData(long datasetLastRefreshTime, DetectionPipelineTaskInfo info, DatasetConfigDTO datasetConfig) {
- long granularity = datasetConfig.bucketTimeGranularity().toMillis();
- return (info.getEnd() - datasetLastRefreshTime) * 1.0 / granularity > 1;
- }
-
- /**
- * Validates if the data is delayed or not based on the user specified SLA configuration
- */
- private boolean hasMissedSLA(long datasetLastRefreshTime, Map<String, Object> slaProps, long slaDetectionEndTime) {
- // fetch the user configured SLA, otherwise default 3_DAYS.
- long delay = TimeGranularity.fromString(MapUtils.getString(slaProps, "sla", DEFAULT_DATA_SLA))
- .toPeriod().toStandardDuration().getMillis();
-
- return (slaDetectionEndTime - datasetLastRefreshTime) >= delay;
- }
-
- /**
- * Align and round off start time to the upper boundary of the granularity
- */
- private static long alignToUpperBoundary(long start, DatasetConfigDTO datasetConfig) {
- Period granularityPeriod = datasetConfig.bucketTimeGranularity().toPeriod();
- DateTimeZone timezone = DateTimeZone.forID(datasetConfig.getTimezone());
- DateTime startTime = new DateTime(start - 1, timezone).plus(granularityPeriod);
- return startTime.getMillis() / granularityPeriod.toStandardDuration().getMillis() * granularityPeriod.toStandardDuration().getMillis();
- }
-
- /**
- * Merges one DATA_MISSING anomaly with remaining existing anomalies.
- */
- private void mergeSLAAnomalies(MergedAnomalyResultDTO anomaly, List<MergedAnomalyResultDTO> existingAnomalies) {
- // Extract the parent SLA anomaly. We can have only 1 parent DATA_MISSING anomaly in a window.
- existingAnomalies.removeIf(MergedAnomalyResultBean::isChild);
- MergedAnomalyResultDTO existingParentSLAAnomaly = existingAnomalies.get(0);
-
- if (isDuplicateSLAAnomaly(existingParentSLAAnomaly, anomaly)) {
- // Ensure anomalies are not duplicated. Ignore and return.
- // Example: daily data with hourly cron should generate only 1 sla alert if data is missing
- return;
- }
- existingParentSLAAnomaly.setChild(true);
- anomaly.setChild(false);
- anomaly.setChildren(Collections.singleton(existingParentSLAAnomaly));
- this.anomalyDAO.save(anomaly);
- if (anomaly.getId() == null) {
- LOG.warn("Could not store data sla check failed anomaly:\n{}", anomaly);
- }
- }
-
- /**
- * Creates a DATA_MISSING anomaly from ceiling(start) to ceiling(end) for the detection id.
- * If existing DATA_MISSING anomalies are present, then it will be merged accordingly.
- */
- private void createDataSLAAnomaly(long start, long end, long detectionId, DatasetConfigDTO datasetConfig,
- Map<String, Object> slaProps) {
- MergedAnomalyResultDTO anomaly = DetectionUtils.makeAnomaly(
- alignToUpperBoundary(start, datasetConfig),
- alignToUpperBoundary(end, datasetConfig),
- detectionId);
- anomaly.setType(AnomalyType.DATA_MISSING);
-
- // Store the metadata in the anomaly
- Map<String, String> properties = new HashMap<>();
- properties.put("datasetLastRefreshTime", String.valueOf(start - 1));
- slaProps.forEach((k, v) -> {
- properties.put(k, v.toString());
- });
- anomaly.setProperties(properties);
-
- List<MergedAnomalyResultDTO> existingAnomalies = anomalyDAO.findAnomaliesWithinBoundary(start, end, detectionId);
- if (!existingAnomalies.isEmpty()) {
- mergeSLAAnomalies(anomaly, existingAnomalies);
- } else {
- // no merging required
- this.anomalyDAO.save(anomaly);
- if (anomaly.getId() == null) {
- LOG.warn("Could not store data sla check failed anomaly:\n{}", anomaly);
- }
- }
- }
-
- /**
- * We say one DATA_MISSING anomaly is a duplicate of another if they belong to the same detection with the same type
- * and span the exact same duration.
- */
- private boolean isDuplicateSLAAnomaly(MergedAnomalyResultDTO anomaly1, MergedAnomalyResultDTO anomaly2) {
- if (anomaly1 == null && anomaly2 == null) {
- return true;
- }
-
- if (anomaly1 == null || anomaly2 == null) {
- return false;
- }
-
- // anomalies belong to the same detection, same type & span over the same duration
- return anomaly1.getDetectionConfigId().equals(anomaly2.getDetectionConfigId())
- && anomaly1.getType() == anomaly2.getType()
- && anomaly1.getStartTime() == anomaly2.getStartTime()
- && anomaly1.getEndTime() == anomaly2.getEndTime();
- }
-}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DetectionConfigValidator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DetectionConfigValidator.java
index fbefa5f..b3cee3e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DetectionConfigValidator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DetectionConfigValidator.java
@@ -177,6 +177,7 @@ public class DetectionConfigValidator implements ConfigValidator<DetectionConfig
// Validate detection rules
Preconditions.checkArgument(ruleYaml.containsKey(PROP_DETECTION),
"Detection rule missing for sub-alert " + alertName + " rule no. " + ruleIndex);
+ // Validate detection rules
List<Map<String, Object>> detectionRuleYamls = ConfigUtils.getList(ruleYaml.get(PROP_DETECTION));
for (Map<String, Object> detectionRuleYaml : detectionRuleYamls) {
validateRule(alertName, detectionRuleYaml, ruleIndex, "detection", names);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index 26bd38a..df5f593 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Collections2;
import java.util.HashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
import org.apache.pinot.thirdeye.dataframe.Series;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
@@ -145,6 +146,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
Collections2.filter(retrieved,
mergedAnomaly -> mergedAnomaly != null &&
!mergedAnomaly.isChild() &&
+ mergedAnomaly.getAnomalyResultSource().equals(AnomalyResultSource.DEFAULT_ANOMALY_DETECTION) &&
// merge if only the anomaly generated by the same detector
this.detectorComponentName.equals(mergedAnomaly.getProperties().getOrDefault(PROP_DETECTOR_COMPONENT_NAME, "")) &&
// merge if only the anomaly is in the same dimension
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 4455a63..d2a1320 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.detection.DataProvider;
@@ -59,8 +60,9 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
Collection<MergedAnomalyResultDTO> anomalies =
this.provider.fetchAnomalies(Collections.singleton(effectiveSlice)).get(effectiveSlice);
- return anomalies.stream()
- .filter(anomaly -> !anomaly.isChild() && ThirdEyeUtils.isDetectedByMultipleComponents(anomaly))
+ return anomalies.stream().filter(anomaly -> !anomaly.isChild()
+ && anomaly.getAnomalyResultSource().equals(AnomalyResultSource.DEFAULT_ANOMALY_DETECTION)
+ && ThirdEyeUtils.isDetectedByMultipleComponents(anomaly))
.collect(Collectors.toList());
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/DataQualityMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/DataQualityMergeWrapper.java
new file mode 100644
index 0000000..23540e1
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/DataQualityMergeWrapper.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.wrapper;
+
+import com.google.common.collect.Collections2;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
+import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+
+
+/**
+ * The Data Quality Merge Wrapper. This merge wrapper is specifically designed keeping the sla anomalies in mind.
+ * We might need to revisit this when more data quality rules are added. Fundamentally, the data sla anomalies are never
+ * merged as we want to keep re-notifying users if the sla is missed after every detection. This merger will ensure no
+ * duplicate sla anomalies are created if the detection runs more frequently and will serve as a placeholder for future
+ * merging logic.
+ */
+public class DataQualityMergeWrapper extends MergeWrapper {
+ private static final String PROP_GROUP_KEY = "groupKey";
+
+ public DataQualityMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
+ super(provider, config, startTime, endTime);
+ }
+
+ @Override
+ protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+ List<MergedAnomalyResultDTO> retrieved = super.retrieveAnomaliesFromDatabase(generated);
+
+ return new ArrayList<>(Collections2.filter(retrieved,
+ anomaly -> !anomaly.isChild() &&
+ anomaly.getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION)
+ ));
+ }
+
+ @Override
+ protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
+ List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
+ input.sort(MergeWrapper.COMPARATOR);
+
+ List<MergedAnomalyResultDTO> output = new ArrayList<>();
+ Map<AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>();
+ for (MergedAnomalyResultDTO anomaly : input) {
+ if (anomaly.isChild()) {
+ continue;
+ }
+
+ // Prevent merging of grouped anomalies
+ String groupKey = "";
+ if (anomaly.getProperties().containsKey(PROP_GROUP_KEY)) {
+ groupKey = anomaly.getProperties().get(PROP_GROUP_KEY);
+ }
+ AnomalyKey key = new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), groupKey,
+ "", anomaly.getType());
+
+ MergedAnomalyResultDTO parent = parents.get(key);
+ if (parent == null) {
+ parents.put(key, anomaly);
+ output.add(anomaly);
+ } else if (anomaly.getEndTime() <= parent.getEndTime()) {
+ // Ignore as an sla anomaly already exists - we do not want a duplicate.
+ // This can happen if the detection is setup to run more frequently than
+ // the dataset granularity
+ } else {
+ // save the anomaly
+ parents.put(key, anomaly);
+ output.add(anomaly);
+ }
+ }
+
+ return output;
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigTranslator.java
index 1610818..7304b21 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigTranslator.java
@@ -20,45 +20,26 @@
package org.apache.pinot.thirdeye.detection.yaml.translator;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.MapUtils;
-import org.apache.pinot.thirdeye.common.time.TimeGranularity;
import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.datasource.pinot.PinotThirdEyeDataSource;
import org.apache.pinot.thirdeye.detection.ConfigUtils;
import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionUtils;
-import org.apache.pinot.thirdeye.detection.algorithm.DimensionWrapper;
-import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
-import org.apache.pinot.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.AnomalyFilterWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.BaselineFillingMergeWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.GrouperWrapper;
-import org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-
-import static org.apache.pinot.thirdeye.detection.ConfigUtils.*;
-import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
+import org.apache.pinot.thirdeye.detection.yaml.translator.builder.DetectionConfigPropertiesBuilder;
+import org.apache.pinot.thirdeye.detection.yaml.translator.builder.DataQualityPropertiesBuilder;
+import org.apache.pinot.thirdeye.detection.yaml.translator.builder.DetectionPropertiesBuilder;
/**
* The YAML translator of composite pipeline. Convert YAML file into detection pipeline properties.
* This pipeline supports multiple detection algorithm running in OR relationship.
* Supports running multiple filter rule running in AND relationship.
- * Conceptually, the pipeline has the following structure. (Grouper not implemented yet).
+ * Conceptually, the pipeline has the following structure.
*
* +-----------+
* | Dimension |
@@ -95,7 +76,7 @@ import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
* |
* |
* +-----v-----+
- * | Grouper* |
+ * | Grouper |
* | |
* +-----------+
*
@@ -128,53 +109,22 @@ import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
public class DetectionConfigTranslator extends ConfigTranslator<DetectionConfigDTO, DetectionConfigValidator> {
public static final String PROP_SUB_ENTITY_NAME = "subEntityName";
- public static final String PROP_DIMENSION_EXPLORATION = "dimensionExploration";
- public static final String PROP_FILTERS = "filters";
-
- private static final String PROP_DETECTION = "detection";
- private static final String PROP_CRON = "cron";
- private static final String PROP_FILTER = "filter";
- private static final String PROP_TYPE = "type";
- private static final String PROP_CLASS_NAME = "className";
- private static final String PROP_PARAMS = "params";
- private static final String PROP_METRIC_URN = "metricUrn";
- private static final String PROP_DIMENSION_FILTER_METRIC = "dimensionFilterMetric";
- private static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
- private static final String PROP_RULES = "rules";
- private static final String PROP_GROUPER = "grouper";
- private static final String PROP_NESTED = "nested";
- private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
- private static final String PROP_DETECTOR = "detector";
- private static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
- private static final String PROP_WINDOW_DELAY = "windowDelay";
- private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
- private static final String PROP_WINDOW_SIZE = "windowSize";
- private static final String PROP_WINDOW_UNIT = "windowUnit";
- private static final String PROP_FREQUENCY = "frequency";
- private static final String PROP_MERGER = "merger";
- private static final String PROP_TIMEZONE = "timezone";
- private static final String PROP_NAME = "name";
- private static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = "RULE_BASELINE";
- private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
- private static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
+ static final String PROP_CRON = "cron";
+ static final String PROP_TYPE = "type";
+ static final String PROP_NAME = "name";
private static final String PROP_DETECTION_NAME = "detectionName";
private static final String PROP_DESC_NAME = "description";
private static final String PROP_ACTIVE = "active";
private static final String PROP_OWNERS = "owners";
- private static final String PROP_ALERTS = "alerts";
- private static final String COMPOSITE_ALERT = "COMPOSITE_ALERT";
- private static final String METRIC_ALERT = "METRIC_ALERT";
-
+ static final String COMPOSITE_ALERT = "COMPOSITE_ALERT";
+ static final String METRIC_ALERT = "METRIC_ALERT";
- private static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
- private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM");
-
- private final Map<String, Object> components = new HashMap<>();
private DataProvider dataProvider;
+ private DetectionConfigPropertiesBuilder detectionTranslatorBuilder;
+ private DetectionConfigPropertiesBuilder dataQualityTranslatorBuilder;
private DetectionMetricAttributeHolder metricAttributesMap;
- private Set<DatasetConfigDTO> datasetConfigs;
public DetectionConfigTranslator(String yamlConfig, DataProvider provider) {
this(yamlConfig, provider, new DetectionConfigValidator(provider));
@@ -184,99 +134,8 @@ public class DetectionConfigTranslator extends ConfigTranslator<DetectionConfigD
super(yamlConfig, validator);
this.dataProvider = provider;
this.metricAttributesMap = new DetectionMetricAttributeHolder(provider);
- this.datasetConfigs = new HashSet<>();
- }
-
- private Map<String, Object> translateMetricAlert(Map<String, Object> metricAlertConfigMap) {
- String subEntityName = MapUtils.getString(metricAlertConfigMap, PROP_NAME);
-
- DatasetConfigDTO datasetConfigDTO = metricAttributesMap.fetchDataset(metricAlertConfigMap);
- datasetConfigs.add(datasetConfigDTO);
- Map<String, Collection<String>> dimensionFiltersMap = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_FILTERS));
- String metricUrn = MetricEntity.fromMetric(dimensionFiltersMap, metricAttributesMap.fetchMetric(metricAlertConfigMap).getId()).getUrn();
- Map<String, Object> mergerProperties = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_MERGER));
-
- // Translate all the rules
- List<Map<String, Object>> ruleYamls = getList(metricAlertConfigMap.get(PROP_RULES));
- List<Map<String, Object>> nestedPipelines = new ArrayList<>();
- for (Map<String, Object> ruleYaml : ruleYamls) {
- List<Map<String, Object>> filterYamls = ConfigUtils.getList(ruleYaml.get(PROP_FILTER));
- List<Map<String, Object>> detectionYamls = ConfigUtils.getList(ruleYaml.get(PROP_DETECTION));
- List<Map<String, Object>> detectionProperties = buildListOfMergeWrapperProperties(
- subEntityName, metricUrn, detectionYamls, mergerProperties,
- datasetConfigDTO.bucketTimeGranularity());
- if (filterYamls.isEmpty()) {
- nestedPipelines.addAll(detectionProperties);
- } else {
- List<Map<String, Object>> filterNestedProperties = detectionProperties;
- for (Map<String, Object> filterProperties : filterYamls) {
- filterNestedProperties = buildFilterWrapperProperties(metricUrn, AnomalyFilterWrapper.class.getName(), filterProperties,
- filterNestedProperties);
- }
- nestedPipelines.addAll(filterNestedProperties);
- }
- }
-
- // Wrap with dimension exploration properties
- Map<String, Object> dimensionWrapperProperties = buildDimensionWrapperProperties(
- metricAlertConfigMap, dimensionFiltersMap, metricUrn, datasetConfigDTO.getDataset());
- Map<String, Object> properties = buildWrapperProperties(
- ChildKeepingMergeWrapper.class.getName(),
- Collections.singletonList(buildWrapperProperties(DimensionWrapper.class.getName(), nestedPipelines, dimensionWrapperProperties)),
- mergerProperties);
-
- // Wrap with metric level grouper, restricting to only 1 grouper
- List<Map<String, Object>> grouperYamls = getList(metricAlertConfigMap.get(PROP_GROUPER));
- if (!grouperYamls.isEmpty()) {
- properties = buildWrapperProperties(
- EntityAnomalyMergeWrapper.class.getName(),
- Collections.singletonList(buildGroupWrapperProperties(subEntityName, metricUrn, grouperYamls.get(0), Collections.singletonList(properties))),
- mergerProperties);
-
- properties = buildWrapperProperties(
- ChildKeepingMergeWrapper.class.getName(),
- Collections.singletonList(properties),
- mergerProperties);
- }
-
- return properties;
- }
-
- private Map<String, Object> translateCompositeAlert(Map<String, Object> compositeAlertConfigMap) {
- Map<String, Object> properties;
- String subEntityName = MapUtils.getString(compositeAlertConfigMap, PROP_NAME);
-
- // Recursively translate all the sub-alerts
- List<Map<String, Object>> subDetectionYamls = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_ALERTS));
- List<Map<String, Object>> nestedPropertiesList = new ArrayList<>();
- for (Map<String, Object> subDetectionYaml : subDetectionYamls) {
- Map<String, Object> subProps;
- if (subDetectionYaml.containsKey(PROP_TYPE) && subDetectionYaml.get(PROP_TYPE).equals(COMPOSITE_ALERT)) {
- subProps = translateCompositeAlert(subDetectionYaml);
- } else {
- subProps = translateMetricAlert(subDetectionYaml);
- }
-
- nestedPropertiesList.add(subProps);
- }
-
- // Wrap the entity level grouper, only 1 grouper is supported now
- List<Map<String, Object>> grouperProps = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_GROUPER));
- Map<String, Object> mergerProperties = ConfigUtils.getMap(compositeAlertConfigMap.get(PROP_MERGER));
- if (!grouperProps.isEmpty()) {
- properties = buildWrapperProperties(
- EntityAnomalyMergeWrapper.class.getName(),
- Collections.singletonList(buildGroupWrapperProperties(subEntityName, grouperProps.get(0), nestedPropertiesList)),
- mergerProperties);
- nestedPropertiesList = Collections.singletonList(properties);;
- }
-
- properties = buildWrapperProperties(
- ChildKeepingMergeWrapper.class.getName(),
- nestedPropertiesList,
- mergerProperties);
-
- return properties;
+ this.detectionTranslatorBuilder = new DetectionPropertiesBuilder(metricAttributesMap, provider);
+ this.dataQualityTranslatorBuilder = new DataQualityPropertiesBuilder(metricAttributesMap, provider);
}
@Override
@@ -291,10 +150,12 @@ public class DetectionConfigTranslator extends ConfigTranslator<DetectionConfigD
yamlConfigMap.putIfAbsent(PROP_TYPE, METRIC_ALERT);
// Translate config depending on the type (METRIC_ALERT OR COMPOSITE_ALERT)
- Map<String, Object> properties;
+ Map<String, Object> detectionProperties;
+ Map<String, Object> qualityProperties;
String cron;
if (yamlConfigMap.get(PROP_TYPE).equals(COMPOSITE_ALERT)) {
- properties = translateCompositeAlert(yamlConfigMap);
+ detectionProperties = detectionTranslatorBuilder.buildCompositeAlertProperties(yamlConfigMap);
+ qualityProperties = dataQualityTranslatorBuilder.buildCompositeAlertProperties(yamlConfigMap);
// TODO: discuss strategy for default cron
Preconditions.checkArgument(yamlConfigMap.containsKey(PROP_CRON), "Missing property (" + PROP_CRON + ") in alert");
@@ -302,238 +163,35 @@ public class DetectionConfigTranslator extends ConfigTranslator<DetectionConfigD
} else {
// The legacy type 'COMPOSITE' will be treated as a metric alert along with the new convention METRIC_ALERT.
// This is applicable only at the root level to maintain backward compatibility.
- properties = translateMetricAlert(yamlConfigMap);
+ detectionProperties = detectionTranslatorBuilder.buildMetricAlertProperties(yamlConfigMap);
+ qualityProperties = dataQualityTranslatorBuilder.buildMetricAlertProperties(yamlConfigMap);
cron = metricAttributesMap.fetchCron(yamlConfigMap);
}
- return generateDetectionConfig(yamlConfigMap, properties, this.components, cron);
- }
-
- private Map<String, Object> buildDimensionWrapperProperties(Map<String, Object> yamlConfigMap,
- Map<String, Collection<String>> dimensionFilters, String metricUrn, String datasetName) {
- Map<String, Object> dimensionWrapperProperties = new HashMap<>();
- dimensionWrapperProperties.put(PROP_NESTED_METRIC_URNS, Collections.singletonList(metricUrn));
- if (yamlConfigMap.containsKey(PROP_DIMENSION_EXPLORATION)) {
- Map<String, Object> dimensionExploreYaml = ConfigUtils.getMap(yamlConfigMap.get(PROP_DIMENSION_EXPLORATION));
- dimensionWrapperProperties.putAll(dimensionExploreYaml);
- if (dimensionExploreYaml.containsKey(PROP_DIMENSION_FILTER_METRIC)){
- MetricConfigDTO dimensionExploreMetric = this.dataProvider.fetchMetric(MapUtils.getString(dimensionExploreYaml, PROP_DIMENSION_FILTER_METRIC), datasetName);
- dimensionWrapperProperties.put(PROP_METRIC_URN, MetricEntity.fromMetric(dimensionFilters, dimensionExploreMetric.getId()).getUrn());
- } else {
- dimensionWrapperProperties.put(PROP_METRIC_URN, metricUrn);
- }
- }
- return dimensionWrapperProperties;
- }
-
- private List<Map<String, Object>> buildListOfMergeWrapperProperties(String subEntityName, String metricUrn,
- List<Map<String, Object>> yamlConfigs, Map<String, Object> mergerProperties, TimeGranularity datasetTimegranularity) {
- List<Map<String, Object>> properties = new ArrayList<>();
- for (Map<String, Object> yamlConfig : yamlConfigs) {
- properties.add(buildMergeWrapperProperties(subEntityName, metricUrn, yamlConfig, mergerProperties, datasetTimegranularity));
- }
- return properties;
- }
-
- private Map<String, Object> buildMergeWrapperProperties(String subEntityName, String metricUrn, Map<String, Object> yamlConfig,
- Map<String, Object> mergerProperties, TimeGranularity datasetTimegranularity) {
- String detectorType = MapUtils.getString(yamlConfig, PROP_TYPE);
- String name = MapUtils.getString(yamlConfig, PROP_NAME);
- Map<String, Object> nestedProperties = new HashMap<>();
- nestedProperties.put(PROP_CLASS_NAME, AnomalyDetectorWrapper.class.getName());
- nestedProperties.put(PROP_SUB_ENTITY_NAME, subEntityName);
- String detectorRefKey = makeComponentRefKey(detectorType, name);
-
- fillInDetectorWrapperProperties(nestedProperties, yamlConfig, detectorType, datasetTimegranularity);
-
- buildComponentSpec(metricUrn, yamlConfig, detectorType, detectorRefKey);
-
- Map<String, Object> properties = new HashMap<>();
- properties.put(PROP_CLASS_NAME, BaselineFillingMergeWrapper.class.getName());
- properties.put(PROP_NESTED, Collections.singletonList(nestedProperties));
- properties.put(PROP_DETECTOR, detectorRefKey);
-
- // fill in baseline provider properties
- if (DETECTION_REGISTRY.isBaselineProvider(detectorType)) {
- // if the detector implements the baseline provider interface, use it to generate baseline
- properties.put(PROP_BASELINE_PROVIDER, detectorRefKey);
- } else {
- String baselineProviderType = DEFAULT_BASELINE_PROVIDER_YAML_TYPE;
- String baselineProviderKey = makeComponentRefKey(baselineProviderType, name);
- buildComponentSpec(metricUrn, yamlConfig, baselineProviderType, baselineProviderKey);
- properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
- }
- properties.putAll(mergerProperties);
- return properties;
- }
-
- private Map<String, Object> buildGroupWrapperProperties(String entityName, Map<String, Object> grouperYaml, List<Map<String, Object>> nestedProps) {
- return buildGroupWrapperProperties(entityName, null, grouperYaml, nestedProps);
- }
-
- private Map<String, Object> buildGroupWrapperProperties(String entityName, String metricUrn,
- Map<String, Object> grouperYaml, List<Map<String, Object>> nestedProps) {
- Map<String, Object> properties = new HashMap<>();
- properties.put(PROP_CLASS_NAME, GrouperWrapper.class.getName());
- properties.put(PROP_NESTED, nestedProps);
- properties.put(PROP_SUB_ENTITY_NAME, entityName);
-
- String grouperType = MapUtils.getString(grouperYaml, PROP_TYPE);
- String grouperName = MapUtils.getString(grouperYaml, PROP_NAME);
- String grouperRefKey = makeComponentRefKey(grouperType, grouperName);
- properties.put(PROP_GROUPER, grouperRefKey);
-
- buildComponentSpec(metricUrn, grouperYaml, grouperType, grouperRefKey);
-
- return properties;
- }
-
- // fill in window size and unit if detector requires this
- private void fillInDetectorWrapperProperties(Map<String, Object> properties, Map<String, Object> yamlConfig, String detectorType, TimeGranularity datasetTimegranularity) {
- // set default bucketPeriod
- properties.put(PROP_BUCKET_PERIOD, datasetTimegranularity.toPeriod().toString());
-
- // override bucketPeriod now since it is needed by detection window
- if (yamlConfig.containsKey(PROP_BUCKET_PERIOD)){
- properties.put(PROP_BUCKET_PERIOD, MapUtils.getString(yamlConfig, PROP_BUCKET_PERIOD));
- }
-
- // set default detection window
- setDefaultDetectionWindow(properties, detectorType);
-
- // override other properties from yaml
- if (yamlConfig.containsKey(PROP_WINDOW_SIZE)) {
- properties.put(PROP_MOVING_WINDOW_DETECTION, true);
- properties.put(PROP_WINDOW_SIZE, MapUtils.getString(yamlConfig, PROP_WINDOW_SIZE));
- }
- if (yamlConfig.containsKey(PROP_WINDOW_UNIT)) {
- properties.put(PROP_MOVING_WINDOW_DETECTION, true);
- properties.put(PROP_WINDOW_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_UNIT));
- }
- if (yamlConfig.containsKey(PROP_WINDOW_DELAY)) {
- properties.put(PROP_WINDOW_DELAY, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY));
- }
- if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
- properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY_UNIT));
- }
- if (yamlConfig.containsKey(PROP_TIMEZONE)){
- properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, PROP_TIMEZONE));
- }
- if (yamlConfig.containsKey(PROP_CACHE_PERIOD_LOOKBACK)) {
- properties.put(PROP_CACHE_PERIOD_LOOKBACK, MapUtils.getString(yamlConfig, PROP_CACHE_PERIOD_LOOKBACK));
- }
- }
-
- // Set the default detection window if it is not specified.
- // Here instead of using data granularity we use the detection period to set the default window size.
- private void setDefaultDetectionWindow(Map<String, Object> properties, String detectorType) {
- if (MOVING_WINDOW_DETECTOR_TYPES.contains(detectorType)) {
- properties.put(PROP_MOVING_WINDOW_DETECTION, true);
- org.joda.time.Period detectionPeriod =
- org.joda.time.Period.parse(MapUtils.getString(properties, PROP_BUCKET_PERIOD));
- int days = detectionPeriod.toStandardDays().getDays();
- int hours = detectionPeriod.toStandardHours().getHours();
- int minutes = detectionPeriod.toStandardMinutes().getMinutes();
- if (days >= 1) {
- properties.put(PROP_WINDOW_SIZE, 1);
- properties.put(PROP_WINDOW_UNIT, TimeUnit.DAYS);
- } else if (hours >= 1) {
- properties.put(PROP_WINDOW_SIZE, 24);
- properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
- } else if (minutes >= 1) {
- properties.put(PROP_WINDOW_SIZE, 6);
- properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
- properties.put(PROP_FREQUENCY, new TimeGranularity(15, TimeUnit.MINUTES));
- } else {
- properties.put(PROP_WINDOW_SIZE, 6);
- properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
- }
- }
- }
-
- private List<Map<String, Object>> buildFilterWrapperProperties(String metricUrn, String wrapperClassName,
- Map<String, Object> yamlConfig, List<Map<String, Object>> nestedProperties) {
- if (yamlConfig == null || yamlConfig.isEmpty()) {
- return nestedProperties;
- }
- Map<String, Object> wrapperProperties = buildWrapperProperties(wrapperClassName, nestedProperties);
- if (wrapperProperties.isEmpty()) {
- return Collections.emptyList();
- }
- String name = MapUtils.getString(yamlConfig, PROP_NAME);
- String filterType = MapUtils.getString(yamlConfig, PROP_TYPE);
- String filterRefKey = makeComponentRefKey(filterType, name);
- wrapperProperties.put(PROP_FILTER, filterRefKey);
- buildComponentSpec(metricUrn, yamlConfig, filterType, filterRefKey);
-
- return Collections.singletonList(wrapperProperties);
- }
-
- private Map<String, Object> buildWrapperProperties(String wrapperClassName,
- List<Map<String, Object>> nestedProperties) {
- return buildWrapperProperties(wrapperClassName, nestedProperties, Collections.emptyMap());
- }
-
- private Map<String, Object> buildWrapperProperties(String wrapperClassName,
- List<Map<String, Object>> nestedProperties, Map<String, Object> defaultProperties) {
- Map<String, Object> properties = new HashMap<>();
- List<Map<String, Object>> wrapperNestedProperties = new ArrayList<>();
- for (Map<String, Object> nested : nestedProperties) {
- if (nested != null && !nested.isEmpty()) {
- wrapperNestedProperties.add(nested);
- }
- }
- if (wrapperNestedProperties.isEmpty()) {
- return properties;
- }
- properties.put(PROP_CLASS_NAME, wrapperClassName);
- properties.put(PROP_NESTED, wrapperNestedProperties);
- properties.putAll(defaultProperties);
- return properties;
- }
-
- private void buildComponentSpec(String metricUrn, Map<String, Object> yamlConfig, String type, String componentRefKey) {
- Map<String, Object> componentSpecs = new HashMap<>();
-
- String componentClassName = DETECTION_REGISTRY.lookup(type);
- componentSpecs.put(PROP_CLASS_NAME, componentClassName);
- componentSpecs.put(PROP_METRIC_URN, metricUrn);
-
- Map<String, Object> params = ConfigUtils.getMap(yamlConfig.get(PROP_PARAMS));
-
- // For tunable components, the model params are computed from user supplied yaml params and previous model params.
- // We store the yaml params under a separate key, PROP_YAML_PARAMS, to distinguish from model params.
- if (DETECTION_REGISTRY.isTunable(componentClassName)) {
- componentSpecs.put(PROP_YAML_PARAMS, params);
- } else {
- componentSpecs.putAll(params);
- }
-
- this.components.put(DetectionUtils.getComponentKey(componentRefKey), componentSpecs);
- }
-
- private String makeComponentRefKey(String type, String name) {
- return "$" + name + ":" + type;
+ return generateDetectionConfig(yamlConfigMap, detectionProperties, qualityProperties, cron);
}
/**
* Fill in common fields of detection config. Properties of the pipeline is filled by the subclass.
*/
- private DetectionConfigDTO generateDetectionConfig(Map<String, Object> yamlConfigMap, Map<String, Object> properties,
- Map<String, Object> components, String cron) {
+ private DetectionConfigDTO generateDetectionConfig(Map<String, Object> yamlConfigMap,
+ Map<String, Object> detectionProperties, Map<String, Object> qualityProperties, String cron) {
DetectionConfigDTO config = new DetectionConfigDTO();
config.setName(MapUtils.getString(yamlConfigMap, PROP_DETECTION_NAME));
config.setDescription(MapUtils.getString(yamlConfigMap, PROP_DESC_NAME));
+ config.setDescription(MapUtils.getString(yamlConfigMap, PROP_DESC_NAME));
config.setLastTimestamp(System.currentTimeMillis());
config.setOwners(filterOwners(ConfigUtils.getList(yamlConfigMap.get(PROP_OWNERS))));
- config.setProperties(properties);
- config.setComponentSpecs(components);
+ config.setProperties(detectionProperties);
+ config.setDataQualityProperties(qualityProperties);
+ config.setComponentSpecs(this.metricAttributesMap.getAllComponenets());
config.setCron(cron);
config.setActive(MapUtils.getBooleanValue(yamlConfigMap, PROP_ACTIVE, true));
config.setYaml(yamlConfig);
//TODO: data-availability trigger is only enabled for detections running on PINOT daily dataset only
+ List<DatasetConfigDTO> datasetConfigs = this.metricAttributesMap.getAllDatasets();
if (MapUtils.getString(yamlConfigMap, PROP_CRON) == null
&& datasetConfigs.stream().allMatch(c -> c.bucketTimeGranularity().getUnit().equals(TimeUnit.DAYS))
&& datasetConfigs.stream().allMatch(c -> c.getDataSource().equals(PinotThirdEyeDataSource.DATA_SOURCE_NAME))) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionMetricAttributeHolder.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionMetricAttributeHolder.java
index 8d290e8..768e6d0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionMetricAttributeHolder.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionMetricAttributeHolder.java
@@ -19,17 +19,14 @@
package org.apache.pinot.thirdeye.detection.yaml.translator;
-import com.google.common.base.Preconditions;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.apache.pinot.thirdeye.common.time.TimeGranularity;
import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.pojo.DatasetConfigBean;
import org.apache.pinot.thirdeye.detection.DataProvider;
import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
@@ -39,7 +36,7 @@ import static org.apache.pinot.thirdeye.detection.ConfigUtils.*;
/**
* A data holder to store the processed information per metric
*/
-class DetectionMetricAttributeHolder {
+public class DetectionMetricAttributeHolder {
private static final String PROP_METRIC = "metric";
private static final String PROP_DATASET = "dataset";
@@ -47,6 +44,8 @@ class DetectionMetricAttributeHolder {
private final Map<String, DetectionMetricProperties> metricAttributesMap = new HashMap<>();
private final DataProvider dataProvider;
+ private final List<DatasetConfigDTO> datasetConfigs = new ArrayList<>();
+ private final Map<String, Object> components = new HashMap<>();
DetectionMetricAttributeHolder(DataProvider provider) {
this.dataProvider = provider;
@@ -62,6 +61,7 @@ class DetectionMetricAttributeHolder {
}
DatasetConfigDTO datasetConfig = fetchDatasetConfigDTO(this.dataProvider, datasetName);
+ datasetConfigs.add(datasetConfig);
MetricConfigDTO metricConfig = this.dataProvider.fetchMetric(metricName, datasetConfig.getDataset());
@@ -72,15 +72,19 @@ class DetectionMetricAttributeHolder {
return metricAliasKey;
}
- DatasetConfigDTO fetchDataset(Map<String, Object> metricAlertConfigMap) {
+ public List<DatasetConfigDTO> getAllDatasets() {
+ return datasetConfigs;
+ }
+
+ public DatasetConfigDTO fetchDataset(Map<String, Object> metricAlertConfigMap) {
return metricAttributesMap.get(loadMetricCache(metricAlertConfigMap)).getDatasetConfigDTO();
}
- MetricConfigDTO fetchMetric(Map<String, Object> metricAlertConfigMap) {
+ public MetricConfigDTO fetchMetric(Map<String, Object> metricAlertConfigMap) {
return metricAttributesMap.get(loadMetricCache(metricAlertConfigMap)).getMetricConfigDTO();
}
- String fetchCron(Map<String, Object> metricAlertConfigMap) {
+ public String fetchCron(Map<String, Object> metricAlertConfigMap) {
return metricAttributesMap.get(loadMetricCache(metricAlertConfigMap)).getCron();
}
@@ -101,4 +105,13 @@ class DetectionMetricAttributeHolder {
return "0 0 0 * * ?";
}
}
+
+ public void addComponent(String componentKey, Map<String, Object> componentSpecs) {
+ components.put(componentKey, componentSpecs);
+ }
+
+ public Map<String, Object> getAllComponenets() {
+ return components;
+ }
+
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DataQualityPropertiesBuilder.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DataQualityPropertiesBuilder.java
new file mode 100644
index 0000000..897a1f7
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DataQualityPropertiesBuilder.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.yaml.translator.builder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionMetricAttributeHolder;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+import static org.apache.pinot.thirdeye.detection.ConfigUtils.*;
+
+
+/**
+ * This class is responsible for translating the data quality properties
+ */
+public class DataQualityPropertiesBuilder extends DetectionConfigPropertiesBuilder {
+
+ static final String PROP_QUALITY_CHECK = "qualityCheck";
+
+ public DataQualityPropertiesBuilder(DetectionMetricAttributeHolder metricAttributesMap, DataProvider provider) {
+ super(metricAttributesMap, provider);
+ }
+
+ /**
+ * Constructs the data quality properties mapping by translating the quality yaml
+ *
+ * @param metricAlertConfigMap holds the parsed yaml for a single metric alert
+ */
+ @Override
+ public Map<String, Object> buildMetricAlertProperties(Map<String, Object> metricAlertConfigMap) {
+ Map<String, Object> properties = new HashMap<>();
+ MetricConfigDTO metricConfigDTO = metricAttributesMap.fetchMetric(metricAlertConfigMap);
+
+ String subEntityName = MapUtils.getString(metricAlertConfigMap, PROP_NAME);
+ Map<String, Object> mergerProperties = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_MERGER));
+
+ Map<String, Collection<String>> dimensionFiltersMap = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_FILTERS));
+ String metricUrn = MetricEntity.fromMetric(dimensionFiltersMap, metricConfigDTO.getId()).getUrn();
+
+ // Translate all the rules
+ List<Map<String, Object>> ruleYamls = getList(metricAlertConfigMap.get(PROP_RULES));
+ List<Map<String, Object>> nestedPipelines = new ArrayList<>();
+ for (Map<String, Object> ruleYaml : ruleYamls) {
+ List<Map<String, Object>> qualityYamls = ConfigUtils.getList(ruleYaml.get(PROP_QUALITY));
+ if (qualityYamls.isEmpty()) {
+ continue;
+ }
+ List<Map<String, Object>> qualityProperties = buildListOfDataQualityProperties(
+ subEntityName, metricUrn, qualityYamls, mergerProperties);
+ nestedPipelines.addAll(qualityProperties);
+ }
+ if (nestedPipelines.isEmpty()) {
+ // No data quality rules
+ return properties;
+ }
+
+ properties.putAll(buildWrapperProperties(DataQualityMergeWrapper.class.getName(), nestedPipelines, mergerProperties));
+ return properties;
+ }
+
+ @Override
+ public Map<String, Object> buildCompositeAlertProperties(Map<String, Object> compositeAlertConfigMap) {
+ Map<String, Object> properties = new HashMap<>();
+
+ // Recursively translate all the sub-alerts
+ List<Map<String, Object>> subDetectionYamls = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_ALERTS));
+ List<Map<String, Object>> nestedPropertiesList = new ArrayList<>();
+ for (Map<String, Object> subDetectionYaml : subDetectionYamls) {
+ Map<String, Object> subProps;
+ if (subDetectionYaml.containsKey(PROP_TYPE) && subDetectionYaml.get(PROP_TYPE).equals(COMPOSITE_ALERT)) {
+ subProps = buildCompositeAlertProperties(subDetectionYaml);
+ } else {
+ subProps = buildMetricAlertProperties(subDetectionYaml);
+ }
+
+ nestedPropertiesList.add(subProps);
+ }
+ if (nestedPropertiesList.isEmpty()) {
+ return properties;
+ }
+
+ properties.putAll(compositePropertyBuilderHelper(nestedPropertiesList, compositeAlertConfigMap));
+ return properties;
+ }
+
+ private List<Map<String, Object>> buildListOfDataQualityProperties(String subEntityName, String metricUrn,
+ List<Map<String, Object>> yamlConfigs, Map<String, Object> mergerProperties) {
+ List<Map<String, Object>> properties = new ArrayList<>();
+ for (Map<String, Object> yamlConfig : yamlConfigs) {
+ properties.add(buildDataQualityWrapperProperties(subEntityName, metricUrn, yamlConfig, mergerProperties));
+ }
+ return properties;
+ }
+
+ private Map<String, Object> buildDataQualityWrapperProperties(String subEntityName, String metricUrn,
+ Map<String, Object> yamlConfig, Map<String, Object> mergerProperties) {
+ String qualityType = MapUtils.getString(yamlConfig, PROP_TYPE);
+ String name = MapUtils.getString(yamlConfig, PROP_NAME);
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(PROP_CLASS_NAME, DataSlaWrapper.class.getName());
+ properties.put(PROP_SUB_ENTITY_NAME, subEntityName);
+ properties.put(PROP_METRIC_URN, metricUrn);
+
+ String qualityRefKey = makeComponentRefKey(qualityType, name);
+ properties.put(PROP_QUALITY_CHECK, qualityRefKey);
+
+ buildComponentSpec(metricUrn, yamlConfig, qualityType, qualityRefKey);
+
+ properties.putAll(mergerProperties);
+ return properties;
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionConfigPropertiesBuilder.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionConfigPropertiesBuilder.java
new file mode 100644
index 0000000..ee7d626
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionConfigPropertiesBuilder.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.yaml.translator.builder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.GrouperWrapper;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionMetricAttributeHolder;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
+
+
+/**
+ * This is the root of the detection config builder. Other translators extend from this class.
+ */
+public abstract class DetectionConfigPropertiesBuilder {
+
+ public static final String PROP_SUB_ENTITY_NAME = "subEntityName";
+ public static final String PROP_DIMENSION_EXPLORATION = "dimensionExploration";
+ public static final String PROP_FILTERS = "filters";
+
+ static final String PROP_DETECTION = "detection";
+ static final String PROP_FILTER = "filter";
+ static final String PROP_QUALITY = "quality";
+ static final String PROP_TYPE = "type";
+ static final String PROP_CLASS_NAME = "className";
+ static final String PROP_PARAMS = "params";
+ static final String PROP_METRIC_URN = "metricUrn";
+ static final String PROP_DIMENSION_FILTER_METRIC = "dimensionFilterMetric";
+ static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
+ static final String PROP_RULES = "rules";
+ static final String PROP_GROUPER = "grouper";
+ static final String PROP_NESTED = "nested";
+ static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
+ static final String PROP_DETECTOR = "detector";
+ static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
+ static final String PROP_WINDOW_DELAY = "windowDelay";
+ static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+ static final String PROP_WINDOW_SIZE = "windowSize";
+ static final String PROP_WINDOW_UNIT = "windowUnit";
+ static final String PROP_FREQUENCY = "frequency";
+ static final String PROP_MERGER = "merger";
+ static final String PROP_TIMEZONE = "timezone";
+ static final String PROP_NAME = "name";
+
+ static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = "RULE_BASELINE";
+ static final String PROP_BUCKET_PERIOD = "bucketPeriod";
+ static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
+
+ static final String PROP_ALERTS = "alerts";
+ static final String COMPOSITE_ALERT = "COMPOSITE_ALERT";
+
+ final DetectionMetricAttributeHolder metricAttributesMap;
+ final DataProvider dataProvider;
+ static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
+
+ DetectionConfigPropertiesBuilder(DetectionMetricAttributeHolder metricAttributesMap, DataProvider dataProvider) {
+ this.metricAttributesMap = metricAttributesMap;
+ this.dataProvider = dataProvider;
+ }
+
+ public abstract Map<String, Object> buildMetricAlertProperties(Map<String, Object> yamlConfigMap) throws IllegalArgumentException;
+
+ public abstract Map<String, Object> buildCompositeAlertProperties(Map<String, Object> yamlConfigMap) throws IllegalArgumentException;
+
+ Map<String, Object> buildDimensionWrapperProperties(Map<String, Object> yamlConfigMap,
+ Map<String, Collection<String>> dimensionFilters, String metricUrn, String datasetName) {
+ Map<String, Object> dimensionWrapperProperties = new HashMap<>();
+ dimensionWrapperProperties.put(PROP_NESTED_METRIC_URNS, Collections.singletonList(metricUrn));
+ if (yamlConfigMap.containsKey(PROP_DIMENSION_EXPLORATION)) {
+ Map<String, Object> dimensionExploreYaml = ConfigUtils.getMap(yamlConfigMap.get(PROP_DIMENSION_EXPLORATION));
+ dimensionWrapperProperties.putAll(dimensionExploreYaml);
+ if (dimensionExploreYaml.containsKey(PROP_DIMENSION_FILTER_METRIC)){
+ MetricConfigDTO dimensionExploreMetric = this.dataProvider.fetchMetric(MapUtils.getString(dimensionExploreYaml, PROP_DIMENSION_FILTER_METRIC), datasetName);
+ dimensionWrapperProperties.put(PROP_METRIC_URN, MetricEntity.fromMetric(dimensionFilters, dimensionExploreMetric.getId()).getUrn());
+ } else {
+ dimensionWrapperProperties.put(PROP_METRIC_URN, metricUrn);
+ }
+ }
+ return dimensionWrapperProperties;
+ }
+
+ Map<String, Object> buildGroupWrapperProperties(String entityName, Map<String, Object> grouperYaml, List<Map<String, Object>> nestedProps) {
+ return buildGroupWrapperProperties(entityName, null, grouperYaml, nestedProps);
+ }
+
+ Map<String, Object> buildGroupWrapperProperties(String entityName, String metricUrn,
+ Map<String, Object> grouperYaml, List<Map<String, Object>> nestedProps) {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(PROP_CLASS_NAME, GrouperWrapper.class.getName());
+ properties.put(PROP_NESTED, nestedProps);
+ properties.put(PROP_SUB_ENTITY_NAME, entityName);
+
+ String grouperType = MapUtils.getString(grouperYaml, PROP_TYPE);
+ String grouperName = MapUtils.getString(grouperYaml, PROP_NAME);
+ String grouperRefKey = makeComponentRefKey(grouperType, grouperName);
+ properties.put(PROP_GROUPER, grouperRefKey);
+
+ buildComponentSpec(metricUrn, grouperYaml, grouperType, grouperRefKey);
+
+ return properties;
+ }
+
+ List<Map<String, Object>> buildFilterWrapperProperties(String metricUrn, String wrapperClassName,
+ Map<String, Object> yamlConfig, List<Map<String, Object>> nestedProperties) {
+ if (yamlConfig == null || yamlConfig.isEmpty()) {
+ return nestedProperties;
+ }
+ Map<String, Object> wrapperProperties = buildWrapperProperties(wrapperClassName, nestedProperties);
+ if (wrapperProperties.isEmpty()) {
+ return Collections.emptyList();
+ }
+ String name = MapUtils.getString(yamlConfig, PROP_NAME);
+ String filterType = MapUtils.getString(yamlConfig, PROP_TYPE);
+ String filterRefKey = makeComponentRefKey(filterType, name);
+ wrapperProperties.put(PROP_FILTER, filterRefKey);
+ buildComponentSpec(metricUrn, yamlConfig, filterType, filterRefKey);
+
+ return Collections.singletonList(wrapperProperties);
+ }
+
+ void buildComponentSpec(String metricUrn, Map<String, Object> yamlConfig, String type, String componentRefKey) {
+ Map<String, Object> componentSpecs = new HashMap<>();
+
+ String componentClassName = DETECTION_REGISTRY.lookup(type);
+ componentSpecs.put(PROP_CLASS_NAME, componentClassName);
+ if (metricUrn != null) {
+ componentSpecs.put(PROP_METRIC_URN, metricUrn);
+ }
+
+ Map<String, Object> params = ConfigUtils.getMap(yamlConfig.get(PROP_PARAMS));
+
+ // For tunable components, the model params are computed from user supplied yaml params and previous model params.
+ // We store the yaml params under a separate key, PROP_YAML_PARAMS, to distinguish from model params.
+ if (DETECTION_REGISTRY.isTunable(componentClassName)) {
+ componentSpecs.put(PROP_YAML_PARAMS, params);
+ } else {
+ componentSpecs.putAll(params);
+ }
+
+ metricAttributesMap.addComponent(DetectionUtils.getComponentKey(componentRefKey), componentSpecs);
+ }
+
+ Map<String, Object> compositePropertyBuilderHelper(List<Map<String, Object>> nestedPropertiesList,
+ Map<String, Object> compositeAlertConfigMap) {
+ Map<String, Object> properties;
+ String subEntityName = MapUtils.getString(compositeAlertConfigMap, PROP_NAME);
+
+ // Wrap the entity level grouper, only 1 grouper is supported now
+ List<Map<String, Object>> grouperProps = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_GROUPER));
+ Map<String, Object> mergerProperties = ConfigUtils.getMap(compositeAlertConfigMap.get(PROP_MERGER));
+ if (!grouperProps.isEmpty()) {
+ properties = buildWrapperProperties(
+ EntityAnomalyMergeWrapper.class.getName(),
+ Collections.singletonList(buildGroupWrapperProperties(subEntityName, grouperProps.get(0), nestedPropertiesList)),
+ mergerProperties);
+ nestedPropertiesList = Collections.singletonList(properties);
+ }
+
+ return buildWrapperProperties(
+ ChildKeepingMergeWrapper.class.getName(),
+ nestedPropertiesList,
+ mergerProperties);
+ }
+
+ Map<String, Object> buildWrapperProperties(String wrapperClassName,
+ List<Map<String, Object>> nestedProperties) {
+ return buildWrapperProperties(wrapperClassName, nestedProperties, Collections.emptyMap());
+ }
+
+ Map<String, Object> buildWrapperProperties(String wrapperClassName,
+ List<Map<String, Object>> nestedProperties, Map<String, Object> defaultProperties) {
+ Map<String, Object> properties = new HashMap<>();
+ List<Map<String, Object>> wrapperNestedProperties = new ArrayList<>();
+ for (Map<String, Object> nested : nestedProperties) {
+ if (nested != null && !nested.isEmpty()) {
+ wrapperNestedProperties.add(nested);
+ }
+ }
+ if (wrapperNestedProperties.isEmpty()) {
+ return properties;
+ }
+ properties.put(PROP_CLASS_NAME, wrapperClassName);
+ properties.put(PROP_NESTED, wrapperNestedProperties);
+ properties.putAll(defaultProperties);
+ return properties;
+ }
+
+ static String makeComponentRefKey(String type, String name) {
+ return "$" + name + ":" + type;
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionPropertiesBuilder.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionPropertiesBuilder.java
new file mode 100644
index 0000000..664e40c
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionPropertiesBuilder.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.yaml.translator.builder;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.algorithm.DimensionWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.AnomalyFilterWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.BaselineFillingMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionMetricAttributeHolder;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+import static org.apache.pinot.thirdeye.detection.ConfigUtils.*;
+
+
+/**
+ * This class is responsible for translating the detection properties
+ */
+public class DetectionPropertiesBuilder extends DetectionConfigPropertiesBuilder {
+
+ private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM");
+
+ public DetectionPropertiesBuilder(DetectionMetricAttributeHolder metricAttributesMap, DataProvider provider) {
+ super(metricAttributesMap, provider);
+ }
+
+ /**
+ * Constructs the detection properties mapping by translating the detection & filter yaml
+ *
+ * @param metricAlertConfigMap holds the parsed yaml for a single metric alert
+ */
+ @Override
+ public Map<String, Object> buildMetricAlertProperties(Map<String, Object> metricAlertConfigMap) {
+ MetricConfigDTO metricConfigDTO = metricAttributesMap.fetchMetric(metricAlertConfigMap);
+ DatasetConfigDTO datasetConfigDTO = metricAttributesMap.fetchDataset(metricAlertConfigMap);
+
+ String subEntityName = MapUtils.getString(metricAlertConfigMap, PROP_NAME);
+ Map<String, Object> mergerProperties = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_MERGER));
+ Map<String, Collection<String>> dimensionFiltersMap = ConfigUtils.getMap(metricAlertConfigMap.get(PROP_FILTERS));
+ String metricUrn = MetricEntity.fromMetric(dimensionFiltersMap, metricConfigDTO.getId()).getUrn();
+
+ // Translate all the rules
+ List<Map<String, Object>> ruleYamls = getList(metricAlertConfigMap.get(PROP_RULES));
+ List<Map<String, Object>> nestedPipelines = new ArrayList<>();
+ for (Map<String, Object> ruleYaml : ruleYamls) {
+ List<Map<String, Object>> detectionYamls = ConfigUtils.getList(ruleYaml.get(PROP_DETECTION));
+ List<Map<String, Object>> detectionProperties = buildListOfMergeWrapperProperties(
+ subEntityName, metricUrn, detectionYamls, mergerProperties, datasetConfigDTO.bucketTimeGranularity());
+
+ List<Map<String, Object>> filterYamls = ConfigUtils.getList(ruleYaml.get(PROP_FILTER));
+ if (filterYamls.isEmpty()) {
+ nestedPipelines.addAll(detectionProperties);
+ } else {
+ List<Map<String, Object>> filterNestedProperties = detectionProperties;
+ for (Map<String, Object> filterProperties : filterYamls) {
+ filterNestedProperties = buildFilterWrapperProperties(metricUrn, AnomalyFilterWrapper.class.getName(), filterProperties,
+ filterNestedProperties);
+ }
+ nestedPipelines.addAll(filterNestedProperties);
+ }
+ }
+
+ // Wrap with dimension exploration properties
+ Map<String, Object> dimensionWrapperProperties = buildDimensionWrapperProperties(
+ metricAlertConfigMap, dimensionFiltersMap, metricUrn, datasetConfigDTO.getDataset());
+ Map<String, Object> properties = buildWrapperProperties(
+ ChildKeepingMergeWrapper.class.getName(),
+ Collections.singletonList(buildWrapperProperties(DimensionWrapper.class.getName(), nestedPipelines, dimensionWrapperProperties)),
+ mergerProperties);
+
+ // Wrap with metric level grouper, restricting to only 1 grouper
+ List<Map<String, Object>> grouperYamls = getList(metricAlertConfigMap.get(PROP_GROUPER));
+ if (!grouperYamls.isEmpty()) {
+ properties = buildWrapperProperties(
+ EntityAnomalyMergeWrapper.class.getName(),
+ Collections.singletonList(buildGroupWrapperProperties(subEntityName, metricUrn, grouperYamls.get(0), Collections.singletonList(properties))),
+ mergerProperties);
+
+ properties = buildWrapperProperties(
+ ChildKeepingMergeWrapper.class.getName(),
+ Collections.singletonList(properties),
+ mergerProperties);
+ }
+
+ return properties;
+ }
+
+ @Override
+ public Map<String, Object> buildCompositeAlertProperties(Map<String, Object> compositeAlertConfigMap) {
+ // Recursively translate all the sub-alerts
+ List<Map<String, Object>> subDetectionYamls = ConfigUtils.getList(compositeAlertConfigMap.get(PROP_ALERTS));
+ List<Map<String, Object>> nestedPropertiesList = new ArrayList<>();
+ for (Map<String, Object> subDetectionYaml : subDetectionYamls) {
+ Map<String, Object> subProps;
+ if (subDetectionYaml.containsKey(PROP_TYPE) && subDetectionYaml.get(PROP_TYPE).equals(COMPOSITE_ALERT)) {
+ subProps = buildCompositeAlertProperties(subDetectionYaml);
+ } else {
+ subProps = buildMetricAlertProperties(subDetectionYaml);
+ }
+
+ nestedPropertiesList.add(subProps);
+ }
+
+ return compositePropertyBuilderHelper(nestedPropertiesList, compositeAlertConfigMap);
+ }
+
+ private List<Map<String, Object>> buildListOfMergeWrapperProperties(String subEntityName, String metricUrn,
+ List<Map<String, Object>> yamlConfigs, Map<String, Object> mergerProperties, TimeGranularity datasetTimegranularity) {
+ List<Map<String, Object>> properties = new ArrayList<>();
+ for (Map<String, Object> yamlConfig : yamlConfigs) {
+ properties.add(buildMergeWrapperProperties(subEntityName, metricUrn, yamlConfig, mergerProperties, datasetTimegranularity));
+ }
+ return properties;
+ }
+
+ private Map<String, Object> buildMergeWrapperProperties(String subEntityName, String metricUrn, Map<String, Object> yamlConfig,
+ Map<String, Object> mergerProperties, TimeGranularity datasetTimegranularity) {
+ String detectorType = MapUtils.getString(yamlConfig, PROP_TYPE);
+ String name = MapUtils.getString(yamlConfig, PROP_NAME);
+ Map<String, Object> nestedProperties = new HashMap<>();
+ nestedProperties.put(PROP_CLASS_NAME, AnomalyDetectorWrapper.class.getName());
+ nestedProperties.put(PROP_SUB_ENTITY_NAME, subEntityName);
+ String detectorRefKey = makeComponentRefKey(detectorType, name);
+
+ fillInDetectorWrapperProperties(nestedProperties, yamlConfig, detectorType, datasetTimegranularity);
+
+ buildComponentSpec(metricUrn, yamlConfig, detectorType, detectorRefKey);
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(PROP_CLASS_NAME, BaselineFillingMergeWrapper.class.getName());
+ properties.put(PROP_NESTED, Collections.singletonList(nestedProperties));
+ properties.put(PROP_DETECTOR, detectorRefKey);
+
+ // fill in baseline provider properties
+ if (DETECTION_REGISTRY.isBaselineProvider(detectorType)) {
+ // if the detector implements the baseline provider interface, use it to generate baseline
+ properties.put(PROP_BASELINE_PROVIDER, detectorRefKey);
+ } else {
+ String baselineProviderType = DEFAULT_BASELINE_PROVIDER_YAML_TYPE;
+ String baselineProviderKey = makeComponentRefKey(baselineProviderType, name);
+ buildComponentSpec(metricUrn, yamlConfig, baselineProviderType, baselineProviderKey);
+ properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
+ }
+ properties.putAll(mergerProperties);
+ return properties;
+ }
+
+ // fill in window size and unit if detector requires this
+ private static void fillInDetectorWrapperProperties(Map<String, Object> properties, Map<String, Object> yamlConfig, String detectorType, TimeGranularity datasetTimegranularity) {
+ // set default bucketPeriod
+ properties.put(PROP_BUCKET_PERIOD, datasetTimegranularity.toPeriod().toString());
+
+ // override bucketPeriod now since it is needed by detection window
+ if (yamlConfig.containsKey(PROP_BUCKET_PERIOD)){
+ properties.put(PROP_BUCKET_PERIOD, MapUtils.getString(yamlConfig, PROP_BUCKET_PERIOD));
+ }
+
+ // set default detection window
+ setDefaultDetectionWindow(properties, detectorType);
+
+ // override other properties from yaml
+ if (yamlConfig.containsKey(PROP_WINDOW_SIZE)) {
+ properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+ properties.put(PROP_WINDOW_SIZE, MapUtils.getString(yamlConfig, PROP_WINDOW_SIZE));
+ }
+ if (yamlConfig.containsKey(PROP_WINDOW_UNIT)) {
+ properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+ properties.put(PROP_WINDOW_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_UNIT));
+ }
+ if (yamlConfig.containsKey(PROP_WINDOW_DELAY)) {
+ properties.put(PROP_WINDOW_DELAY, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY));
+ }
+ if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
+ properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY_UNIT));
+ }
+ if (yamlConfig.containsKey(PROP_TIMEZONE)){
+ properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, PROP_TIMEZONE));
+ }
+ if (yamlConfig.containsKey(PROP_CACHE_PERIOD_LOOKBACK)) {
+ properties.put(PROP_CACHE_PERIOD_LOOKBACK, MapUtils.getString(yamlConfig, PROP_CACHE_PERIOD_LOOKBACK));
+ }
+ }
+
+ // Set the default detection window if it is not specified.
+ // Here instead of using data granularity we use the detection period to set the default window size.
+ private static void setDefaultDetectionWindow(Map<String, Object> properties, String detectorType) {
+ if (MOVING_WINDOW_DETECTOR_TYPES.contains(detectorType)) {
+ properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+ org.joda.time.Period detectionPeriod =
+ org.joda.time.Period.parse(MapUtils.getString(properties, PROP_BUCKET_PERIOD));
+ int days = detectionPeriod.toStandardDays().getDays();
+ int hours = detectionPeriod.toStandardHours().getHours();
+ int minutes = detectionPeriod.toStandardMinutes().getMinutes();
+ if (days >= 1) {
+ properties.put(PROP_WINDOW_SIZE, 1);
+ properties.put(PROP_WINDOW_UNIT, TimeUnit.DAYS);
+ } else if (hours >= 1) {
+ properties.put(PROP_WINDOW_SIZE, 24);
+ properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+ } else if (minutes >= 1) {
+ properties.put(PROP_WINDOW_SIZE, 6);
+ properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+ properties.put(PROP_FREQUENCY, new TimeGranularity(15, TimeUnit.MINUTES));
+ } else {
+ properties.put(PROP_WINDOW_SIZE, 6);
+ properties.put(PROP_WINDOW_UNIT, TimeUnit.HOURS);
+ }
+ }
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/formatter/DetectionConfigFormatter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/formatter/DetectionConfigFormatter.java
index b64075e..127b8a8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/formatter/DetectionConfigFormatter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/formatter/DetectionConfigFormatter.java
@@ -46,15 +46,12 @@ import org.apache.pinot.thirdeye.datasource.DAORegistry;
import org.apache.pinot.thirdeye.detection.ConfigUtils;
import org.apache.pinot.thirdeye.detection.health.DetectionHealth;
import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import org.joda.time.DateTime;
import org.joda.time.Period;
import org.joda.time.PeriodType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import static org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator.*;
-import static org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator.*;
+import static org.apache.pinot.thirdeye.detection.yaml.translator.builder.DetectionConfigPropertiesBuilder.*;
/**
@@ -84,9 +81,6 @@ public class DetectionConfigFormatter implements DTOFormatter<DetectionConfigDTO
private static final String PROP_NESTED_PROPERTIES_KEY = "nested";
private static final String PROP_MONITORING_GRANULARITY = "monitoringGranularity";
private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
- private static final String PROP_VARIABLES_BUCKET_PERIOD = "variables.bucketPeriod";
-
- private static final Logger LOG = LoggerFactory.getLogger(DetectionConfigFormatter.class);
private static final long DEFAULT_PRESENTING_WINDOW_SIZE_MINUTELY = TimeUnit.HOURS.toMillis(48);
private static final long DEFAULT_PRESENTING_WINDOW_SIZE_DAILY = TimeUnit.DAYS.toMillis(30);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/notification/content/BaseNotificationContent.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/notification/content/BaseNotificationContent.java
index f1174ce..8d9b1f4 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/notification/content/BaseNotificationContent.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/notification/content/BaseNotificationContent.java
@@ -318,7 +318,7 @@ public abstract class BaseNotificationContent implements NotificationContent {
String liftValue = String.format(PERCENTAGE_FORMAT, lift * 100);
// Fetch the lift value for a SLA anomaly
- if (anomaly.getType().equals(AnomalyType.DATA_MISSING)) {
+ if (anomaly.getType().equals(AnomalyType.DATA_SLA)) {
liftValue = getFormattedSLALiftValue(anomaly);
}
@@ -329,7 +329,7 @@ public abstract class BaseNotificationContent implements NotificationContent {
* The lift value for an SLA anomaly is delay from the configured sla. (Ex: 2 days & 3 hours)
*/
protected static String getFormattedSLALiftValue(MergedAnomalyResultDTO anomaly) {
- if (!anomaly.getType().equals(AnomalyType.DATA_MISSING)
+ if (!anomaly.getType().equals(AnomalyType.DATA_SLA)
|| anomaly.getProperties() == null || anomaly.getProperties().isEmpty()
|| !anomaly.getProperties().containsKey("sla")
|| !anomaly.getProperties().containsKey("datasetLastRefreshTime")) {
@@ -357,7 +357,7 @@ public abstract class BaseNotificationContent implements NotificationContent {
* The predicted value for an SLA anomaly is the configured sla. (Ex: 2_DAYS)
*/
protected static String getSLAPredictedValue(MergedAnomalyResultDTO anomaly) {
- if (!anomaly.getType().equals(AnomalyType.DATA_MISSING)
+ if (!anomaly.getType().equals(AnomalyType.DATA_SLA)
|| anomaly.getProperties() == null || anomaly.getProperties().isEmpty()
|| !anomaly.getProperties().containsKey("sla")) {
return "-";
@@ -373,7 +373,7 @@ public abstract class BaseNotificationContent implements NotificationContent {
String predicted = ThirdEyeUtils.getRoundedValue(anomaly.getAvgBaselineVal());
// For SLA anomalies, we use the sla as the predicted value
- if (anomaly.getType().equals(AnomalyType.DATA_MISSING)) {
+ if (anomaly.getType().equals(AnomalyType.DATA_SLA)) {
predicted = getSLAPredictedValue(anomaly);
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java
index 271eedb..216784a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java
@@ -20,6 +20,7 @@
package org.apache.pinot.thirdeye.scheduler;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
@@ -31,7 +32,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.thirdeye.datalayer.pojo.AbstractBean;
import org.apache.pinot.thirdeye.datalayer.pojo.DetectionConfigBean;
-import org.apache.pinot.thirdeye.detection.DetectionDataSLAJob;
+import org.apache.pinot.thirdeye.detection.dataquality.DataQualityPipelineJob;
import org.apache.pinot.thirdeye.detection.DetectionPipelineJob;
import org.apache.pinot.thirdeye.detection.DetectionUtils;
import org.apache.pinot.thirdeye.detection.TaskUtils;
@@ -55,6 +56,7 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
public static final int DEFAULT_DETECTION_DELAY = 1;
public static final TimeUnit DEFAULT_ALERT_DELAY_UNIT = TimeUnit.MINUTES;
+ public static final String QUARTZ_DETECTION_GROUPER = TaskConstants.TaskType.DETECTION.toString();
final DetectionConfigManager detectionDAO;
final Scheduler scheduler;
@@ -79,28 +81,45 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
// add or update
for (DetectionConfigDTO config : configs) {
- JobKey key = new JobKey(getJobKey(config.getId()), TaskConstants.TaskType.DETECTION.toString());
if (!config.isActive()) {
- LOG.debug("Detection config " + key + " is inactive. Skipping.");
+ LOG.debug("Detection config " + config.getId() + " is inactive. Skipping.");
continue;
}
- if (DetectionUtils.isDataAvailabilityCheckEnabled(config)) {
- LOG.debug("Detection config " + key + " is enabled for data availability scheduling. Skipping.");
+ if (config.isDataAvailabilitySchedule()) {
+ LOG.debug("Detection config " + config.getId() + " is enabled for data availability scheduling. Skipping.");
continue;
}
+
try {
- if (scheduler.checkExists(key)) {
- LOG.info("Detection config " + key.getName() + " is already scheduled");
- if (isJobUpdated(config, key)) {
- // restart job
- stopJob(key);
- startJob(config, key);
+ // Schedule detection jobs
+ JobKey detectionJobKey = new JobKey(getJobKey(config.getId(), TaskConstants.TaskType.DETECTION),
+ QUARTZ_DETECTION_GROUPER);
+ JobDetail detectionJob = JobBuilder.newJob(DetectionPipelineJob.class).withIdentity(detectionJobKey).build();
+ if (scheduler.checkExists(detectionJobKey)) {
+ LOG.info("Detection config " + detectionJobKey.getName() + " is already scheduled for detection");
+ if (isJobUpdated(config, detectionJobKey)) {
+ restartJob(config, detectionJob);
+ }
+ } else {
+ startJob(config, detectionJob);
+ }
+
+ // Schedule data quality jobs
+ JobKey dataQualityJobKey = new JobKey(getJobKey(config.getId(), TaskConstants.TaskType.DATA_QUALITY),
+ QUARTZ_DETECTION_GROUPER);
+ JobDetail dataQualityJob = JobBuilder.newJob(DataQualityPipelineJob.class).withIdentity(dataQualityJobKey).build();
+ if (scheduler.checkExists(dataQualityJobKey)) {
+ LOG.info("Detection config " + dataQualityJobKey.getName() + " is already scheduled for data quality");
+ if (isJobUpdated(config, dataQualityJobKey)) {
+ restartJob(config, dataQualityJob);
}
} else {
- startJob(config, key);
+ if (DetectionUtils.isDataQualityCheckEnabled(config)) {
+ startJob(config, dataQualityJob);
+ }
}
} catch (Exception e) {
- LOG.error("Error creating/updating job key {}", key);
+ LOG.error("Error creating/updating job key for detection config {}", config.getId());
}
}
@@ -131,9 +150,14 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
}
}
+ private void restartJob(DetectionConfigDTO config, JobDetail job) throws SchedulerException {
+ stopJob(job.getKey());
+ startJob(config, job);
+ }
+
@Override
public Set<JobKey> getScheduledJobs() throws SchedulerException {
- return this.scheduler.getJobKeys(GroupMatcher.jobGroupEquals(TaskConstants.TaskType.DETECTION.toString()));
+ return this.scheduler.getJobKeys(GroupMatcher.jobGroupEquals(QUARTZ_DETECTION_GROUPER));
}
@Override
@@ -143,20 +167,11 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
}
@Override
- public void startJob(AbstractBean config, JobKey key) throws SchedulerException {
+ public void startJob(AbstractBean config, JobDetail job) throws SchedulerException {
Trigger trigger = TriggerBuilder.newTrigger().withSchedule(
CronScheduleBuilder.cronSchedule(((DetectionConfigBean) config).getCron())).build();
- JobDetail detectionJob = JobBuilder.newJob(DetectionPipelineJob.class).withIdentity(key).build();
-
- this.scheduler.scheduleJob(detectionJob, trigger);
- LOG.info(String.format("scheduled detection pipeline job %s", detectionJob.getKey().getName()));
-
- // Data SLA alerts will be scheduled only when enabled by the user.
- if (DetectionUtils.isDataAvailabilityCheckEnabled((DetectionConfigDTO) config)) {
- JobDetail dataSLAJob = JobBuilder.newJob(DetectionDataSLAJob.class).withIdentity(key).build();
- this.scheduler.scheduleJob(dataSLAJob, trigger);
- LOG.info(String.format("scheduled data sla jobs %s", dataSLAJob.getKey().getName()));
- }
+ this.scheduler.scheduleJob(job, trigger);
+ LOG.info(String.format("scheduled detection pipeline job %s", job.getKey().getName()));
}
@Override
@@ -170,8 +185,8 @@ public class DetectionCronScheduler implements ThirdEyeCronScheduler {
}
@Override
- public String getJobKey(Long id) {
- return String.format("%s_%d", TaskConstants.TaskType.DETECTION, id);
+ public String getJobKey(Long id, TaskConstants.TaskType taskType) {
+ return String.format("%s_%d", taskType, id);
}
private boolean isJobUpdated(DetectionConfigDTO config, JobKey key) throws SchedulerException {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/SubscriptionCronScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/SubscriptionCronScheduler.java
index a44d9a7..afac377 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/SubscriptionCronScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/SubscriptionCronScheduler.java
@@ -59,6 +59,7 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
private static final int DEFAULT_ALERT_DELAY = 1;
private static final TimeUnit DEFAULT_ALERT_DELAY_UNIT = TimeUnit.MINUTES;
+ public static final String QUARTZ_SUBSCRIPTION_GROUPER = TaskConstants.TaskType.DETECTION_ALERT.toString();
final Scheduler scheduler;
private ScheduledExecutorService scheduledExecutorService;
@@ -111,7 +112,7 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
@Override
public Set<JobKey> getScheduledJobs() throws SchedulerException {
- return scheduler.getJobKeys(GroupMatcher.jobGroupEquals(TaskConstants.TaskType.DETECTION_ALERT.toString()));
+ return scheduler.getJobKeys(GroupMatcher.jobGroupEquals(QUARTZ_SUBSCRIPTION_GROUPER));
}
@Override
@@ -121,12 +122,11 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
}
@Override
- public void startJob(AbstractBean config, JobKey key) throws SchedulerException {
+ public void startJob(AbstractBean config, JobDetail job) throws SchedulerException {
Trigger trigger = TriggerBuilder.newTrigger().withSchedule(
CronScheduleBuilder.cronSchedule(((DetectionAlertConfigBean) config).getCronExpression())).build();
- JobDetail job = JobBuilder.newJob(DetectionAlertJob.class).withIdentity(key).build();
this.scheduler.scheduleJob(job, trigger);
- LOG.info(String.format("scheduled subscription pipeline job %s", key.getName()));
+ LOG.info(String.format("scheduled subscription pipeline job %s", job.getKey().getName()));
}
@Override
@@ -139,8 +139,8 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
}
@Override
- public String getJobKey(Long id) {
- return String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, id);
+ public String getJobKey(Long id, TaskConstants.TaskType taskType) {
+ return String.format("%s_%d", taskType, id);
}
private void deleteAlertJob(JobKey scheduledJobKey) throws SchedulerException {
@@ -157,7 +157,8 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
Long id = alertConfig.getId();
boolean isActive = alertConfig.isActive();
- JobKey key = new JobKey(getJobKey(id), TaskConstants.TaskType.DETECTION_ALERT.toString());
+ JobKey key = new JobKey(getJobKey(id, TaskConstants.TaskType.DETECTION_ALERT), QUARTZ_SUBSCRIPTION_GROUPER);
+ JobDetail job = JobBuilder.newJob(DetectionAlertJob.class).withIdentity(key).build();
boolean isScheduled = scheduledJobs.contains(key);
if (isActive) {
@@ -173,11 +174,11 @@ public class SubscriptionCronScheduler implements ThirdEyeCronScheduler {
"Cron expression for config {} with jobKey {} has been changed from {} to {}. " + "Restarting schedule",
id, key, cronInSchedule, cronInDatabase);
stopJob(key);
- startJob(alertConfig, key);
+ startJob(alertConfig, job);
}
} else {
LOG.info("Found active but not scheduled {}", id);
- startJob(alertConfig, key);
+ startJob(alertConfig, job);
}
} else {
if (isScheduled) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/ThirdEyeCronScheduler.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/ThirdEyeCronScheduler.java
index eb3549c..62c3d88 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/ThirdEyeCronScheduler.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/ThirdEyeCronScheduler.java
@@ -23,7 +23,9 @@
package org.apache.pinot.thirdeye.scheduler;
import java.util.Set;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
import org.apache.pinot.thirdeye.datalayer.pojo.AbstractBean;
+import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
@@ -40,7 +42,7 @@ public interface ThirdEyeCronScheduler extends Runnable {
void shutdown() throws SchedulerException;
// Trigger the scheduler to start creating jobs.
- void startJob(AbstractBean config, JobKey key) throws SchedulerException;
+ void startJob(AbstractBean config, JobDetail key) throws SchedulerException;
// Stop the scheduler from scheduling more jobs.
void stopJob(JobKey key) throws SchedulerException;
@@ -49,5 +51,5 @@ public interface ThirdEyeCronScheduler extends Runnable {
Set<JobKey> getScheduledJobs() throws SchedulerException;
// Get the key for the scheduling job
- String getJobKey(Long id);
+ String getJobKey(Long id, TaskConstants.TaskType taskType);
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskSchedulerTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskSchedulerTest.java
index 7861dec..ca2ef7c 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskSchedulerTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskSchedulerTest.java
@@ -241,7 +241,7 @@ public class DataAvailabilityTaskSchedulerTest {
Map<String, Object> metricSla = new HashMap<>();
Map<String, Object> props = new HashMap<>();
props.put("testMetricUrn", metricSla);
- detection.setDataSLAProperties(props);
+ detection.setDataQualityProperties(props);
long detection1 = detectionConfigDAO.save(detection);
long halfHourAgo = TEST_TIME - TimeUnit.MINUTES.toMillis(30);
createDataset(1, TEST_TIME, halfHourAgo);
@@ -258,7 +258,7 @@ public class DataAvailabilityTaskSchedulerTest {
jobNames.add(waitingTasks.get(0).getJobName());
jobNames.add(waitingTasks.get(1).getJobName());
Assert.assertTrue(jobNames.contains(TaskConstants.TaskType.DETECTION.toString() + "_" + detection1));
- Assert.assertTrue(jobNames.contains(TaskConstants.TaskType.DATA_SLA.toString() + "_" + detection1));
+ Assert.assertTrue(jobNames.contains(TaskConstants.TaskType.DATA_QUALITY.toString() + "_" + detection1));
}
@Test
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
index 364c80c..965d6fd 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
@@ -32,6 +32,7 @@ import org.apache.pinot.thirdeye.datalayer.dto.EvaluationDTO;
import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
import org.apache.pinot.thirdeye.detection.spi.model.EvaluationSlice;
import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
@@ -217,6 +218,18 @@ public class MockDataProvider implements DataProvider {
return this.loader.from(this, config, start, end);
}
+ @Override
+ public List<DatasetConfigDTO> fetchDatasetByDisplayName(String datasetDisplayName) {
+ List<DatasetConfigDTO> datasetConfigDTOs = new ArrayList<>();
+ for (DatasetConfigDTO datasetConfigDTO : getDatasets()) {
+ if (datasetConfigDTO.getDisplayName().equals(datasetDisplayName)) {
+ datasetConfigDTOs.add(datasetConfigDTO);
+ }
+ }
+
+ return datasetConfigDTOs;
+ }
+
public Map<MetricSlice, DataFrame> getTimeseries() {
return timeseries;
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java
new file mode 100644
index 0000000..963946d
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java
@@ -0,0 +1,642 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.anomaly.task.TaskContext;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AbstractDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+public class DataQualityTaskRunnerTest {
+ private DetectionPipelineTaskInfo info;
+ private TaskContext context;
+ private DAOTestBase testDAOProvider;
+ private DetectionConfigManager detectionDAO;
+ private DatasetConfigManager datasetDAO;
+ private MergedAnomalyResultManager anomalyDAO;
+ private EvaluationManager evaluationDAO;
+ private MetricConfigDTO metricConfigDTO;
+ private DatasetConfigDTO datasetConfigDTO;
+ private DetectionConfigDTO detectionConfigDTO;
+
+ private long detectorId;
+ private DetectionPipelineLoader loader;
+ private DataProvider provider;
+
+ private static final long GRANULARITY = TimeUnit.DAYS.toMillis(1);
+ private static final long START_TIME = new DateTime(System.currentTimeMillis(), DateTimeZone.forID("UTC"))
+ .withMillisOfDay(0).getMillis();
+
+ @BeforeMethod
+ public void beforeMethod() throws IOException {
+ this.testDAOProvider = DAOTestBase.getInstance();
+ this.detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
+ this.datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+ this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+ this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+ this.loader = new DetectionPipelineLoader();
+
+ DetectionRegistry.registerComponent(DataSlaQualityChecker.class.getName(), "DATA_SLA");
+
+ metricConfigDTO = new MetricConfigDTO();
+ metricConfigDTO.setId(123L);
+ metricConfigDTO.setName("thirdeye-test");
+ metricConfigDTO.setDataset("thirdeye-test-dataset");
+
+ datasetConfigDTO = new DatasetConfigDTO();
+ datasetConfigDTO.setId(124L);
+ datasetConfigDTO.setDataset("thirdeye-test-dataset");
+ datasetConfigDTO.setDisplayName("thirdeye-test-dataset");
+ datasetConfigDTO.setTimeDuration(1);
+ datasetConfigDTO.setTimeUnit(TimeUnit.DAYS);
+ datasetConfigDTO.setTimezone("UTC");
+ datasetConfigDTO.setLastRefreshTime(START_TIME + 4 * GRANULARITY - 1);
+ datasetConfigDTO.setDataSource("TestSource");
+ this.datasetDAO.save(datasetConfigDTO);
+
+ this.provider = new MockDataProvider()
+ .setLoader(this.loader)
+ .setMetrics(Collections.singletonList(metricConfigDTO))
+ .setDatasets(Collections.singletonList(datasetConfigDTO))
+ .setAnomalies(Collections.emptyList());
+
+ detectionConfigDTO = translateSlaConfig(-1, "sla-config-1.yaml");
+ this.detectorId = detectionConfigDTO.getId();
+
+ this.info = new DetectionPipelineTaskInfo();
+ this.info.setConfigId(this.detectorId);
+ this.context = new TaskContext();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void afterMethod() {
+ this.testDAOProvider.cleanup();
+ }
+
+ /**
+ * Load and update the detection config from filePath into detectionId
+ */
+ private DetectionConfigDTO translateSlaConfig(long detectionId, String filePath) throws IOException {
+ String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream(filePath), StandardCharsets.UTF_8);
+ DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+ DetectionConfigDTO detectionConfigDTO = translator.translate();
+ if (detectionId < 0) {
+ detectionConfigDTO.setId(this.detectionDAO.save(detectionConfigDTO));
+ } else {
+ detectionConfigDTO.setId(detectionId);
+ this.detectionDAO.update(detectionConfigDTO);
+ }
+ return detectionConfigDTO;
+ }
+
+ /**
+ * Prepare slice [offsetStart, offsetEnd)
+ */
+ private MetricSlice prepareMetricSlice(long offsetStart, long offsetEnd) {
+ long start = START_TIME + offsetStart * GRANULARITY;
+ long end = START_TIME + offsetEnd * GRANULARITY;
+ return MetricSlice.from(123L, start, end);
+ }
+
+ private Map<MetricSlice, DataFrame> prepareMockTimeseriesMap(long offsetStart, long offsetEnd) {
+ Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
+ MetricSlice slice = prepareMetricSlice(offsetStart, offsetEnd);
+ timeSeries.put(slice, prepareMockTimeseries(offsetStart, offsetEnd));
+ return timeSeries;
+ }
+
+ /**
+ * Prepares a mock time-series [offsetStart, offsetEnd] with Geometric progression values starting from 100
+ */
+ private DataFrame prepareMockTimeseries(long offsetStart, long offsetEnd) {
+ long[] values = new long[(int) (offsetEnd - offsetStart + 1)];
+ long[] timestamps = new long[(int) (offsetEnd - offsetStart + 1)];
+
+ long gpFactor = 2;
+ long currentValue = 100;
+ for (long i = offsetStart; i <= offsetEnd; i++) {
+ int index = (int) (i - offsetStart);
+ values[index] = currentValue;
+ timestamps[index] = START_TIME + i * GRANULARITY;
+ currentValue = currentValue * gpFactor;
+ }
+
+ return new DataFrame()
+ .addSeries(COL_VALUE, values)
+ .addSeries(COL_TIME, timestamps);
+ }
+
+ private List<MergedAnomalyResultDTO> retrieveAllAnomalies() {
+ List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
+ // remove id and create_time for comparison purposes
+ anomalies.forEach(anomaly -> anomaly.setId(null));
+ anomalies.forEach(anomaly -> anomaly.setCreatedTime(null));
+ return anomalies;
+ }
+
+ private void cleanUpAnomalies() {
+ List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
+ anomalyDAO.deleteByIds(anomalies.stream().map(AbstractDTO::getId).collect(Collectors.toList()));
+ }
+
+ private MergedAnomalyResultDTO makeSlaAnomaly(long offsetStart, long offsetEnd, String sla, String ruleName) {
+ MergedAnomalyResultDTO anomalyDTO = new MergedAnomalyResultDTO();
+ anomalyDTO.setStartTime(START_TIME + offsetStart * GRANULARITY);
+ anomalyDTO.setEndTime(START_TIME + offsetEnd * GRANULARITY);
+ anomalyDTO.setMetricUrn("thirdeye:metric:123");
+ anomalyDTO.setMetric("thirdeye-test");
+ anomalyDTO.setCollection("thirdeye-test-dataset");
+ anomalyDTO.setType(AnomalyType.DATA_SLA);
+ anomalyDTO.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
+ anomalyDTO.setDetectionConfigId(this.detectorId);
+ anomalyDTO.setChildIds(Collections.emptySet());
+
+ Map<String, String> anomalyProps = new HashMap<>();
+ long start = START_TIME + offsetStart * GRANULARITY;
+ anomalyProps.put("datasetLastRefreshTime", String.valueOf(start - 1));
+ anomalyProps.put("sla", sla);
+ anomalyProps.put("detectorComponentName", ruleName);
+ anomalyProps.put("subEntityName", "test_sla_alert");
+ anomalyDTO.setProperties(anomalyProps);
+ return anomalyDTO;
+ }
+
+ /**
+ * Test if a Data SLA anomaly is crested when data is not available
+ */
+ @Test
+ public void testDataSlaWhenDataIsMissing() throws Exception {
+ Map<MetricSlice, DataFrame> timeSeries = prepareMockTimeseriesMap(0, 3);
+ MockDataProvider mockDataProvider = new MockDataProvider()
+ .setLoader(this.loader)
+ .setTimeseries(timeSeries)
+ .setMetrics(Collections.singletonList(metricConfigDTO))
+ .setDatasets(Collections.singletonList(datasetConfigDTO))
+ .setAnomalies(Collections.emptyList());
+ DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+ this.detectionDAO,
+ this.anomalyDAO,
+ this.evaluationDAO,
+ this.loader,
+ mockDataProvider
+ );
+
+ // CHECK 0: Report data missing immediately if delayed even by a minute (sla = 0_DAYS)
+ // This comes into effect if the detection runs more frequently than the dataset granularity.
+ detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-0.yaml");
+
+ // 1st scan - sla breach
+ // time: 3____4 5 // We have data till 3rd, data for 4th is missing/delayed
+ // scan: |----|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 5 * GRANULARITY);
+ runner.execute(this.info, this.context);
+ // 1 data sla anomaly should be created
+ List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
+ expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ // clean up
+ expectedAnomalies.clear();
+ cleanUpAnomalies();
+
+ // CHECK 1: Report data delayed by at least a day (sla = 1_DAYS)
+ detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-1.yaml");
+
+ // 1st scan - sla breach
+ // time: 3____4 5 // We have data till 3rd, data for 4th is missing/delayed
+ // scan: |----|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 5 * GRANULARITY);
+ runner.execute(this.info, this.context);
+ // 1 data sla anomaly should be created
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ // clean up
+ expectedAnomalies.clear();
+ cleanUpAnomalies();
+
+
+ // CHECK 2: Report data missing when delayed (sla = 2_DAYS)
+ detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-2.yaml");
+
+ // 1st scan after issue - no sla breach
+ // time: 3____4 5 // We have data till 3rd, data for 4th is missing/delayed
+ // scan: |----|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 5 * GRANULARITY);
+ runner.execute(this.info, this.context);
+ // 0 data sla anomaly should be created as there is no delay
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 0);
+
+ // 2nd scan after issue - sla breach
+ // time: 3____4 5 6 // Data for 4th still hasn't arrived
+ // scan: |---------|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 6 * GRANULARITY);
+ runner.execute(this.info, this.context);
+ // 1 data sla anomaly should be created as data is delayed by 1 ms
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ expectedAnomalies.add(makeSlaAnomaly(4, 6, "2_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ // clean up
+ expectedAnomalies.clear();
+ cleanUpAnomalies();
+
+
+ // CHECK 3: Report data missing when delayed by (sla = 3_DAYS)
+ detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-3.yaml");
+
+ // 1st scan after issue - no sla breach
+ // time: 3____4 5 // We have data till 3rd, data for 4th is missing/delayed
+ // scan: |----|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 5 * GRANULARITY);
+ runner.execute(this.info, this.context);
+ // 0 data sla anomaly should be created as there is no delay
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 0);
+
+ // 2nd scan after issue - no sla breach
+ // time: 3____4 5 6 // Data for 4th still hasn't arrived, no sla breach
+ // scan: |---------|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 6 * GRANULARITY);
+ runner.execute(this.info, this.context);
+ // 0 data sla anomaly should be created as there is no delay
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 0);
+
+ // 3rd scan after issue - sla breach
+ // time: 3____4 5 6 7 // Data for 4th still hasn't arrived
+ // scan: |--------------|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 7 * GRANULARITY);
+ runner.execute(this.info, this.context);
+ // 1 data sla anomaly should be created from 4 to 7 as the data is missing since 3 days
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ expectedAnomalies.add(makeSlaAnomaly(4, 7, "3_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ // clean up
+ expectedAnomalies.clear();
+ cleanUpAnomalies();
+
+
+ // CHECK 4: Report data missing when there is a gap btw LastRefreshTime & SLA window start
+ detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-1.yaml");
+
+ // 2 cases are possible:
+ // a. with availability events
+ // b. no availability events - directly query source for just the window
+
+ // a. with availability events
+ // 1st scan after issue - sla breach
+ // time: 3____4 5 6 // We have data till 3rd, data since 4th is missing/delayed
+ // scan: |----|
+ this.info.setStart(START_TIME + 5 * GRANULARITY);
+ this.info.setEnd(START_TIME + 6 * GRANULARITY);
+ runner.execute(this.info, this.context);
+
+ // 1 data sla anomaly should be created from 4 to 6
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ // clean up
+ expectedAnomalies.clear();
+ cleanUpAnomalies();
+
+
+ // b. no availability events - directly query source
+ datasetConfigDTO.setLastRefreshTime(0);
+ datasetDAO.update(datasetConfigDTO);
+ // 1st scan after issue - sla breach
+ // time: 3____4 5 6 // We have data till 3rd, data since 4th is missing/delayed
+ // scan: |----|
+ this.info.setStart(START_TIME + 5 * GRANULARITY);
+ this.info.setEnd(START_TIME + 6 * GRANULARITY);
+ runner.execute(this.info, this.context);
+
+ // 1 data sla anomaly should be created from 4 to 6
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ expectedAnomalies.add(makeSlaAnomaly(5, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ // clean up
+ expectedAnomalies.clear();
+ cleanUpAnomalies();
+ }
+
+ /**
+ * Test that data SLA anomalies are created when data is partially available
+ */
+ @Test
+ public void testDataSlaWhenDataPartiallyAvailable() throws Exception {
+ Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
+ // Assumption - As per data availability info, we have data till 3rd.
+ datasetConfigDTO.setLastRefreshTime(START_TIME + 4 * GRANULARITY - 1);
+ datasetDAO.update(datasetConfigDTO);
+
+ MockDataProvider mockDataProvider = new MockDataProvider()
+ .setLoader(this.loader)
+ .setTimeseries(timeSeries)
+ .setMetrics(Collections.singletonList(metricConfigDTO))
+ .setDatasets(Collections.singletonList(datasetConfigDTO))
+ .setAnomalies(Collections.emptyList());
+ DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+ this.detectionDAO,
+ this.anomalyDAO,
+ this.evaluationDAO,
+ this.loader,
+ mockDataProvider
+ );
+
+ // Create detection window (2 - 10) with a lot of empty data points (no data after 3rd).
+ MetricSlice sliceWithManyEmptyDataPoints = prepareMetricSlice(2, 10);
+ timeSeries.put(sliceWithManyEmptyDataPoints, prepareMockTimeseries(2, 3));
+ this.info.setStart(sliceWithManyEmptyDataPoints.getStart());
+ this.info.setEnd(sliceWithManyEmptyDataPoints.getEnd());
+ runner.execute(this.info, this.context);
+
+ // 1 data sla anomaly should be created for the missing days
+ List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
+ expectedAnomalies.add(makeSlaAnomaly(4, 10, "1_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ //clean up
+ expectedAnomalies.clear();
+ cleanUpAnomalies();
+
+
+ // Create another detection window (2 - 10) with no empty data points.
+ MetricSlice sliceWithFullDataPoints = prepareMetricSlice(2, 10);
+ timeSeries.put(sliceWithFullDataPoints, prepareMockTimeseries(2, 9));
+ this.info.setStart(sliceWithFullDataPoints.getStart());
+ this.info.setEnd(sliceWithFullDataPoints.getEnd());
+ runner.execute(this.info, this.context);
+
+ // 0 data sla anomaly should be created as data is considered to be completely available (above threshold)
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 0);
+
+
+ // Create another detection window (2 - 10) with more data points than the data availability event.
+ MetricSlice sliceWithMoreDataPointsThanEvent = prepareMetricSlice(2, 10);
+ timeSeries.put(sliceWithMoreDataPointsThanEvent, prepareMockTimeseries(2, 7));
+ this.info.setStart(sliceWithMoreDataPointsThanEvent.getStart());
+ this.info.setEnd(sliceWithMoreDataPointsThanEvent.getEnd());
+ runner.execute(this.info, this.context);
+
+ // 1 data sla anomaly should be created for the missing days
+ // Data source has more data than what the availability event reports, the sla anomaly should reflect accordingly.
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ expectedAnomalies = new ArrayList<>();
+ expectedAnomalies.add(makeSlaAnomaly(8, 10, "1_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ //clean up
+ expectedAnomalies.clear();
+ cleanUpAnomalies();
+ }
+
+ /**
+ * Test that no Data SLA anomaly is created when data is complete and available
+ */
+ @Test
+ public void testDataSlaWhenDataAvailable() throws Exception {
+ Map<MetricSlice, DataFrame> timeSeries = prepareMockTimeseriesMap(0, 3);
+ MockDataProvider mockDataProvider = new MockDataProvider()
+ .setLoader(this.loader)
+ .setTimeseries(timeSeries)
+ .setMetrics(Collections.singletonList(metricConfigDTO))
+ .setDatasets(Collections.singletonList(datasetConfigDTO))
+ .setAnomalies(Collections.emptyList());
+ DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+ this.detectionDAO,
+ this.anomalyDAO,
+ this.evaluationDAO,
+ this.loader,
+ mockDataProvider
+ );
+
+ // Prepare the data sla task over existing data in time-series
+ this.info.setStart(START_TIME + 1 * GRANULARITY);
+ this.info.setEnd(START_TIME + 3 * GRANULARITY);
+ runner.execute(this.info, this.context);
+
+ // No data sla anomaly should be created
+ List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 0);
+
+ // Prepare the data sla task over existing data in time-series
+ this.info.setStart(START_TIME);
+ this.info.setEnd(START_TIME + 4 * GRANULARITY);
+ runner.execute(this.info, this.context);
+
+ // No data sla anomaly should be created
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 0);
+ }
+
+ /**
+ * Note that we do not support sla on dimensional data. This test validates that no SLA anomalies are created
+ * when some dimensional data is missing but the overall dataset is complete.
+ */
+ @Test
+ public void testDataSlaWhenDimensionalDataUnavailable() throws Exception {
+ Map<MetricSlice, DataFrame> timeSeries = prepareMockTimeseriesMap(0, 3);
+ // Prepare the mock time-series where 2 dimensional data points are missing but the overall metric has complete data
+ // Cached slice time window represents that we have complete data (4 points)
+ Multimap<String, String> filters = HashMultimap.create();
+ filters.put("dim1", "1");
+ MetricSlice dimensionSlice = MetricSlice.from(123L, START_TIME, START_TIME + 4 * GRANULARITY, filters);
+ // create 2 data points
+ timeSeries.put(dimensionSlice, prepareMockTimeseries(0, 1));
+ MockDataProvider mockDataProvider = new MockDataProvider()
+ .setLoader(this.loader)
+ .setTimeseries(timeSeries)
+ .setMetrics(Collections.singletonList(metricConfigDTO))
+ .setDatasets(Collections.singletonList(datasetConfigDTO))
+ .setAnomalies(Collections.emptyList());
+ DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+ this.detectionDAO,
+ this.anomalyDAO,
+ this.evaluationDAO,
+ this.loader,
+ mockDataProvider
+ );
+
+ // Scan a period where dimensional data is missing but dataset is complete.
+ this.info.setStart(START_TIME + 3 * GRANULARITY);
+ this.info.setEnd(START_TIME + 4 * GRANULARITY);
+ runner.execute(this.info, this.context);
+
+ // 0 SLA anomaly should be created. Though dimensional data is missing, the dataset as a whole is complete.
+ List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 0);
+
+ // Prepare the data sla task on unavailable data
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 5 * GRANULARITY);
+ runner.execute(this.info, this.context);
+
+ // 1 SLA anomaly should be created
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ Assert.assertEquals(anomalies.get(0).getStartTime(), START_TIME + 4 * GRANULARITY);
+ Assert.assertEquals(anomalies.get(0).getEndTime(), START_TIME + 5 * GRANULARITY);
+ Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_SLA);
+ Assert.assertTrue(anomalies.get(0).getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION));
+ Map<String, String> anomalyProps = new HashMap<>();
+ anomalyProps.put("datasetLastRefreshTime", String.valueOf(START_TIME + 4 * GRANULARITY - 1));
+ anomalyProps.put("sla", "1_DAYS");
+ anomalyProps.put("detectorComponentName", "slaRule1:DATA_SLA");
+ anomalyProps.put("subEntityName", "test_sla_alert");
+ Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
+ }
+
+ /**
+ * Test if data sla anomalies are properly merged
+ */
+ @Test
+ public void testDataSlaAnomalyMerge() throws Exception {
+ Map<MetricSlice, DataFrame> timeSeries = prepareMockTimeseriesMap( 0, 3);
+ MockDataProvider mockDataProvider = new MockDataProvider()
+ .setLoader(this.loader)
+ .setTimeseries(timeSeries)
+ .setMetrics(Collections.singletonList(metricConfigDTO))
+ .setDatasets(Collections.singletonList(datasetConfigDTO))
+ .setAnomalies(Collections.emptyList());
+ DataQualityPipelineTaskRunner runner = new DataQualityPipelineTaskRunner(
+ this.detectionDAO,
+ this.anomalyDAO,
+ this.evaluationDAO,
+ this.loader,
+ mockDataProvider
+ );
+
+ // 1st scan
+ // time: 3____4 5 // We have data till 3rd, data for 4th is missing/delayed
+ // scan: |----|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 5 * GRANULARITY);
+ runner.execute(this.info, this.context);
+
+ // 1 data sla anomaly should be created from 4 to 5
+ List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 1);
+ MergedAnomalyResultDTO slaAnomaly = anomalies.get(0);
+ Assert.assertEquals(slaAnomaly.getStartTime(), START_TIME + 4 * GRANULARITY);
+ Assert.assertEquals(slaAnomaly.getEndTime(), START_TIME + 5 * GRANULARITY);
+ Assert.assertEquals(slaAnomaly.getType(), AnomalyType.DATA_SLA);
+ Assert.assertTrue(slaAnomaly.getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION));
+ Map<String, String> anomalyProps = new HashMap<>();
+ anomalyProps.put("datasetLastRefreshTime", String.valueOf(START_TIME + 4 * GRANULARITY - 1));
+ anomalyProps.put("sla", "1_DAYS");
+ anomalyProps.put("detectorComponentName", "slaRule1:DATA_SLA");
+ anomalyProps.put("subEntityName", "test_sla_alert");
+ Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
+
+ // 2nd scan
+ // time: 3____4 5 6 // Data for 4th still hasn't arrived
+ // scan: |---------|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 6 * GRANULARITY);
+ runner.execute(this.info, this.context);
+
+ // We should now have 2 anomalies in our database (4 to 5) and (4 to 6)
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 2);
+ List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
+ expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
+ expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+
+ // 3rd scan
+ // time: 3____4____5 6 7 // Data for 4th has arrived by now, data for 5th is still missing
+ // scan: |--------------|
+ this.info.setStart(START_TIME + 4 * GRANULARITY);
+ this.info.setEnd(START_TIME + 7 * GRANULARITY);
+ datasetConfigDTO.setLastRefreshTime(START_TIME + 5 * GRANULARITY - 1);
+ datasetDAO.update(datasetConfigDTO);
+ runner = new DataQualityPipelineTaskRunner(
+ this.detectionDAO,
+ this.anomalyDAO,
+ this.evaluationDAO,
+ this.loader,
+ mockDataProvider
+ );
+ runner.execute(this.info, this.context);
+
+ // We will now have 3 anomalies in our database
+ // In the current iteration we will create 1 new anomaly from (5 to 7)
+ anomalies = retrieveAllAnomalies();
+ Assert.assertEquals(anomalies.size(), 3);
+ expectedAnomalies = new ArrayList<>();
+ expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
+ expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+ expectedAnomalies.add(makeSlaAnomaly(5, 7, "1_DAYS", "slaRule1:DATA_SLA"));
+ Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunnerTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunnerTest.java
deleted file mode 100644
index dd58ef7..0000000
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/datasla/DatasetSlaTaskRunnerTest.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.datasla;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.pinot.thirdeye.anomaly.AnomalyType;
-import org.apache.pinot.thirdeye.anomaly.task.TaskContext;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
-import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
-import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-public class DatasetSlaTaskRunnerTest {
- private DetectionPipelineTaskInfo info;
- private TaskContext context;
- private DAOTestBase testDAOProvider;
- private DetectionConfigManager detectionDAO;
- private DatasetConfigManager datasetDAO;
- private MergedAnomalyResultManager anomalyDAO;
- private EvaluationManager evaluationDAO;
- private MetricConfigDTO metricConfigDTO;
- private DatasetConfigDTO datasetConfigDTO;
- private DetectionConfigDTO detectionConfigDTO;
- private long detectorId;
-
- private final Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
- private DataFrame dataFrame;
-
- private static long period = TimeUnit.DAYS.toMillis(1);
- private static DateTimeZone timezone = DateTimeZone.forID("UTC");
- private static long startTime = new DateTime(System.currentTimeMillis(), timezone).withMillisOfDay(0).getMillis();
-
- @BeforeMethod
- public void beforeMethod() {
- this.testDAOProvider = DAOTestBase.getInstance();
- this.detectionDAO = DAORegistry.getInstance().getDetectionConfigManager();
- this.datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
- this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
- this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
-
- metricConfigDTO = new MetricConfigDTO();
- metricConfigDTO.setId(123L);
- metricConfigDTO.setName("thirdeye-test");
- metricConfigDTO.setDataset("thirdeye-test-dataset");
-
- datasetConfigDTO = new DatasetConfigDTO();
- datasetConfigDTO.setId(124L);
- datasetConfigDTO.setDataset("thirdeye-test-dataset");
- datasetConfigDTO.setTimeDuration(1);
- datasetConfigDTO.setTimeUnit(TimeUnit.DAYS);
- datasetConfigDTO.setTimezone("UTC");
- datasetConfigDTO.setLastRefreshTime(startTime + 4 * period - 1);
- this.datasetDAO.save(datasetConfigDTO);
-
- detectionConfigDTO = new DetectionConfigDTO();
- detectionConfigDTO.setName("myName");
- detectionConfigDTO.setDescription("myDescription");
- detectionConfigDTO.setCron("myCron");
- Map<String, Object> metricSla = new HashMap<>();
- metricSla.put("sla", "1_DAYS");
- Map<String, Object> props = new HashMap<>();
- props.put("thirdeye:metric:123", metricSla);
- detectionConfigDTO.setDataSLAProperties(props);
- this.detectorId = this.detectionDAO.save(detectionConfigDTO);
-
- this.info = new DetectionPipelineTaskInfo();
- this.info.setConfigId(this.detectorId);
- this.context = new TaskContext();
-
- // Prepare the mock time-series data with 1 ms granularity
- this.dataFrame = new DataFrame()
- .addSeries(COL_VALUE, 100, 200, 500, 1000)
- .addSeries(COL_TIME, startTime, startTime + period, startTime + 2*period, startTime + 3*period);
- MetricSlice slice = MetricSlice.from(123L, startTime, startTime + 4*period);
- this.timeSeries.put(slice, dataFrame);
- }
-
- @AfterMethod(alwaysRun = true)
- public void afterMethod() {
- this.testDAOProvider.cleanup();
- }
-
- /**
- * Test if a Data SLA anomaly is crested when data is not available
- */
- @Test
- public void testDataSlaWhenDataIsMissing() throws Exception {
- MockDataProvider mockDataProvider = new MockDataProvider()
- .setTimeseries(timeSeries)
- .setMetrics(Collections.singletonList(metricConfigDTO))
- .setDatasets(Collections.singletonList(datasetConfigDTO));
- Map<String, Object> metricSla = new HashMap<>();
- Map<String, Object> props = new HashMap<>();
- DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-
- // CHECK 1: Report data missing immediately (sla - 0 ms)
- metricSla.put("sla", "1_DAYS");
- props.put("thirdeye:metric:123", metricSla);
- detectionConfigDTO.setDataSLAProperties(props);
- this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-
- // 1st scan after issue
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 5 * period);
- runner.execute(this.info, this.context);
- // 1 data sla anomaly should be created
- List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 1);
- Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
- Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 5 * period);
- Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
- anomalyDAO.delete(anomalies.get(0)); // clean up
-
-
- // CHECK 2: Report data missing when delayed by (sla - 1 day)
- metricSla.put("sla", "2_DAYS");
- props.put("thirdeye:metric:123", metricSla);
- detectionConfigDTO.setDataSLAProperties(props);
- this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-
- // 1st scan after issue
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 5 * period);
- runner.execute(this.info, this.context);
- // 0 data sla anomaly should be created as there is no delay
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 0);
-
- // 2nd scan after issue
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 6 * period);
- runner.execute(this.info, this.context);
- // 1 data sla anomaly should be created as data is delayed by 1 ms
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 1);
- Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
- Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 6 * period);
- Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
- Map<String, String> anomalyProps = new HashMap<>();
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
- anomalyProps.put("sla", "2_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
- anomalyDAO.delete(anomalies.get(0)); // clean up
-
-
- // CHECK 3: Report data missing when delayed by (sla - 2 ms)
- metricSla.put("sla", "3_DAYS");
- props.put("thirdeye:metric:123", metricSla);
- detectionConfigDTO.setDataSLAProperties(props);
- this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-
- // 1st scan after issue
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 5 * period);
- runner.execute(this.info, this.context);
- // 0 data sla anomaly should be created as there is no delay
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 0);
-
- // 2nd scan after issue
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 6 * period);
- runner.execute(this.info, this.context);
- // 0 data sla anomaly should be created as there is no delay
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 0);
-
- // 3rd scan after issue
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 7 * period);
- runner.execute(this.info, this.context);
- // 1 data sla anomaly should be created as the data is delayed by 2 ms
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 1);
- Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
- Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 7 * period);
- Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
- anomalyProps.put("sla", "3_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
- anomalyDAO.delete(anomalies.get(0)); // clean up
-
-
- // CHECK 4: Report data missing when there is a gap btw LastRefreshTime & SLA window start
- metricSla.put("sla", "1_DAYS");
- props.put("thirdeye:metric:123", metricSla);
- detectionConfigDTO.setDataSLAProperties(props);
- this.detectorId = this.detectionDAO.update(detectionConfigDTO);
- this.info.setStart(startTime + 5 * period);
- this.info.setEnd(startTime + 6 * period);
- runner.execute(this.info, this.context);
-
- // 1 data sla anomaly should be created
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 1);
- Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
- Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 6 * period);
- Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
- anomalyProps.put("sla", "1_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
- anomalyDAO.delete(anomalies.get(0)); // clean up
- }
-
- /**
- * Test that no data SLA anomalies are created when data is partially available
- * We do not deal with partial data in this current iteration/phase
- */
- @Test
- public void testDataSlaWhenDataPartiallyAvailable() throws Exception {
- MetricSlice sliceOverlapBelowThreshold = MetricSlice.from(123L, startTime + 2 * period, startTime + 10 * period);
- timeSeries.put(sliceOverlapBelowThreshold, new DataFrame()
- .addSeries(COL_VALUE, 500, 1000)
- .addSeries(COL_TIME, startTime + 2 * period, startTime + 3 * period));
- MetricSlice sliceOverlapAboveThreshold = MetricSlice.from(123L, startTime + 2 * period, startTime + 4 * period);
- timeSeries.put(sliceOverlapAboveThreshold, new DataFrame()
- .addSeries(COL_VALUE, 500, 1000)
- .addSeries(COL_TIME, startTime + 2 * period, startTime + 3 * period));
- MockDataProvider mockDataProvider = new MockDataProvider()
- .setTimeseries(timeSeries)
- .setMetrics(Collections.singletonList(metricConfigDTO))
- .setDatasets(Collections.singletonList(datasetConfigDTO));
- DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
- datasetConfigDTO.setLastRefreshTime(0);
- datasetDAO.update(datasetConfigDTO);
-
- this.info.setStart(sliceOverlapBelowThreshold.getStart());
- this.info.setEnd(sliceOverlapBelowThreshold.getEnd());
- runner.execute(this.info, this.context);
-
- // 1 data sla anomaly should be created for the missing days
- List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 1);
- Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
- Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 10 * period);
- Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
- Map<String, String> anomalyProps = new HashMap<>();
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 3 * period));
- anomalyProps.put("sla", "1_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
- anomalyDAO.delete(anomalies.get(0)); // clean up
-
- this.info.setStart(sliceOverlapAboveThreshold.getStart());
- this.info.setEnd(sliceOverlapAboveThreshold.getEnd());
- runner.execute(this.info, this.context);
-
- // 0 data sla anomaly should be created as data is considered to be completely availability (above threshold)
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 0);
- }
-
- /**
- * Test that no Data SLA anomaly is created when data is complete and available
- */
- @Test
- public void testDataSlaWhenDataAvailable() throws Exception {
- MockDataProvider mockDataProvider = new MockDataProvider()
- .setTimeseries(timeSeries)
- .setMetrics(Collections.singletonList(metricConfigDTO))
- .setDatasets(Collections.singletonList(datasetConfigDTO));
- DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-
- // Prepare the data sla task over existing data in time-series
- this.info.setStart(startTime + period);
- this.info.setEnd(startTime + 3 * period);
- runner.execute(this.info, this.context);
-
- // No data sla anomaly should be created
- List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 0);
-
- // Prepare the data sla task over existing data in time-series
- this.info.setStart(startTime);
- this.info.setEnd(startTime + 4 * period);
- runner.execute(this.info, this.context);
-
- // No data sla anomaly should be created
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 0);
- }
-
- /**
- * Test that no data SLA anomaly is created when dataset is available but dimensional data is missing
- */
- @Test
- public void testDataSlaWhenDimensionalDataUnavailable() throws Exception {
- // Prepare the mock dimensional time-series where 2 data points are missing but the overall dataset has complete data
- Multimap<String, String> filters = HashMultimap.create();
- filters.put("dim1", "1");
- MetricSlice dimensionSlice = MetricSlice.from(123L, startTime, startTime + 4 * period, filters);
- Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
- // has only 2 data points
- DataFrame dataFrame = new DataFrame()
- .addSeries(COL_VALUE, 100, 200)
- .addSeries(COL_TIME, startTime, startTime + period);
- timeSeries.put(dimensionSlice, dataFrame);
- MockDataProvider mockDataProvider = new MockDataProvider()
- .setTimeseries(timeSeries)
- .setMetrics(Collections.singletonList(metricConfigDTO))
- .setDatasets(Collections.singletonList(datasetConfigDTO));
- DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-
- Map<String, Object> metricSla = new HashMap<>();
- metricSla.put("sla", "1_DAYS");
- Map<String, Object> props = new HashMap<>();
- props.put("thirdeye:metric:123:dim1%3D1", metricSla);
- detectionConfigDTO.setDataSLAProperties(props);
- this.detectorId = this.detectionDAO.update(detectionConfigDTO);
-
- // Scan a period where dimensional data is missing but dataset is complete.
- this.info.setConfigId(this.detectorId);
- this.info.setStart(startTime + 3 * period);
- this.info.setEnd(startTime + 4 * period);
- runner.execute(this.info, this.context);
-
- // 0 SLA anomaly should be created. Though dimensional data is missing, the dataset as a whole is complete.
- List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 0);
-
- // Prepare the data sla task on unavailable data
- this.info.setConfigId(this.detectorId);
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 5 * period);
- runner.execute(this.info, this.context);
-
- // 1 SLA anomaly should be created
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 1);
- Assert.assertEquals(anomalies.get(0).getStartTime(), startTime + 4 * period);
- Assert.assertEquals(anomalies.get(0).getEndTime(), startTime + 5 * period);
- Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_MISSING);
- Map<String, String> anomalyProps = new HashMap<>();
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
- anomalyProps.put("sla", "1_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
- }
-
- /**
- * Test if data sla anomalies are properly merged
- */
- @Test
- public void testDataSlaAnomalyMerge() throws Exception {
- MockDataProvider mockDataProvider = new MockDataProvider()
- .setTimeseries(timeSeries)
- .setMetrics(Collections.singletonList(metricConfigDTO))
- .setDatasets(Collections.singletonList(datasetConfigDTO));
- DatasetSlaTaskRunner runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
-
- // 1st scan
- // time: 3____4 5 // We have data till 3rd, data for 4th is missing/delayed
- // scan: |----|
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 5 * period);
- runner.execute(this.info, this.context);
-
- // 1 data sla anomaly should be created from 4 to 5
- List<MergedAnomalyResultDTO> anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 1);
- MergedAnomalyResultDTO slaAnomaly = anomalies.get(0);
- Assert.assertEquals(slaAnomaly.getStartTime(), startTime + 4 * period);
- Assert.assertEquals(slaAnomaly.getEndTime(), startTime + 5 * period);
- Assert.assertEquals(slaAnomaly.getType(), AnomalyType.DATA_MISSING);
- Map<String, String> anomalyProps = new HashMap<>();
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
- anomalyProps.put("sla", "1_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
-
- // 2nd scan
- // time: 3____4 5 6 // Data for 4th still hasn't arrived
- // scan: |---------|
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 6 * period);
- runner.execute(this.info, this.context);
-
- // We should now have 2 anomalies in our database (1 parent, 1 child)
- // The new anomaly (4 to 6) will be parent of the existing anomaly (4 to 5) created during the previous scan
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 2);
- for (int i = 0; i < 2; i++) {
- MergedAnomalyResultDTO anomalyDTO = anomalies.get(i);
- if (!anomalyDTO.isChild()) {
- // Parent anomaly
- Assert.assertEquals(anomalyDTO.getStartTime(), startTime + 4 * period);
- Assert.assertEquals(anomalyDTO.getEndTime(), startTime + 6 * period);
- Assert.assertEquals(anomalyDTO.getType(), AnomalyType.DATA_MISSING);
- Assert.assertTrue(anomalyDTO.getChildIds().contains(slaAnomaly.getId()));
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
- anomalyProps.put("sla", "1_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
- } else {
- // Child anomaly
- Assert.assertEquals(anomalyDTO.getStartTime(), startTime + 4 * period);
- Assert.assertEquals(anomalyDTO.getEndTime(), startTime + 5 * period);
- Assert.assertEquals(anomalyDTO.getType(), AnomalyType.DATA_MISSING);
- Assert.assertTrue(anomalyDTO.isChild());
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
- anomalyProps.put("sla", "1_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
- }
- }
-
- // 3rd scan
- // time: 3____4____5 6 7 // Data for 4th has arrived by now, data for 5th is still missing
- // scan: |--------------|
- this.info.setStart(startTime + 4 * period);
- this.info.setEnd(startTime + 7 * period);
- datasetConfigDTO.setLastRefreshTime(startTime + 5 * period - 1);
- datasetDAO.update(datasetConfigDTO);
- runner = new DatasetSlaTaskRunner(detectionDAO, anomalyDAO, evaluationDAO, mockDataProvider);
- runner.execute(this.info, this.context);
-
- // We will now have 3 anomalies in our database (2 parent and 1 child)
- // 1 parent and 1 child created during the previous iterations
- // In the current iteration we will create 1 new anomaly from (5 to 7)
- anomalies = anomalyDAO.findAll();
- Assert.assertEquals(anomalies.size(), 3);
- anomalies = anomalies.stream().filter(anomaly -> !anomaly.isChild()).collect(Collectors.toList());
- // We should now have 2 parent anomalies
- Assert.assertEquals(anomalies.size(), 2);
- anomalies.get(0).setId(null);
- anomalies.get(1).setId(null);
-
- List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
- MergedAnomalyResultDTO anomalyDTO = new MergedAnomalyResultDTO();
- anomalyDTO.setStartTime(startTime + 5 * period);
- anomalyDTO.setEndTime(startTime + 7 * period);
- anomalyDTO.setType(AnomalyType.DATA_MISSING);
- anomalyDTO.setDetectionConfigId(this.detectorId);
- anomalyDTO.setChildIds(Collections.emptySet());
- anomalyProps.put("datasetLastRefreshTime", String.valueOf(startTime + 4 * period - 1));
- anomalyProps.put("sla", "1_DAYS");
- Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
- expectedAnomalies.add(anomalyDTO);
-
- Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigSlaTranslatorTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigSlaTranslatorTest.java
new file mode 100644
index 0000000..db9087b
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/DetectionConfigSlaTranslatorTest.java
@@ -0,0 +1,121 @@
+package org.apache.pinot.thirdeye.detection.yaml.translator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.pinot.PinotThirdEyeDataSource;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker;
+import org.apache.pinot.thirdeye.detection.components.MockGrouper;
+import org.apache.pinot.thirdeye.detection.components.RuleBaselineProvider;
+import org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter;
+import org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class DetectionConfigSlaTranslatorTest {
+
+ private Long metricId;
+ private DataProvider provider;
+ private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private DAOTestBase testDAOProvider;
+ private DAORegistry daoRegistry;
+
+ @BeforeClass
+ void beforeClass() {
+ testDAOProvider = DAOTestBase.getInstance();
+ daoRegistry = DAORegistry.getInstance();
+ }
+
+ @AfterClass(alwaysRun = true)
+ void afterClass() {
+ testDAOProvider.cleanup();
+ }
+
+ @BeforeMethod
+ public void setUp() {
+ MetricConfigDTO metricConfig = new MetricConfigDTO();
+ metricConfig.setAlias("alias");
+ metricConfig.setName("test_metric");
+ metricConfig.setDataset("test_dataset");
+ this.metricId = 1L;
+ metricConfig.setId(metricId);
+ daoRegistry.getMetricConfigDAO().save(metricConfig);
+
+ DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO();
+ datasetConfigDTO.setDataset("test_dataset");
+ datasetConfigDTO.setTimeUnit(TimeUnit.DAYS);
+ datasetConfigDTO.setTimeDuration(1);
+ datasetConfigDTO.setDataSource(PinotThirdEyeDataSource.DATA_SOURCE_NAME);
+ daoRegistry.getDatasetConfigDAO().save(datasetConfigDTO);
+
+ DetectionRegistry.registerComponent(DataSlaQualityChecker.class.getName(), "DATA_SLA");
+ DetectionRegistry.registerComponent(ThresholdRuleDetector.class.getName(), "THRESHOLD");
+ DetectionRegistry.registerComponent(ThresholdRuleAnomalyFilter.class.getName(), "THRESHOLD_RULE_FILTER");
+ DetectionRegistry.registerComponent(RuleBaselineProvider.class.getName(), "RULE_BASELINE");
+ DetectionRegistry.registerComponent(MockGrouper.class.getName(), "MOCK_GROUPER");
+ this.provider = new MockDataProvider().setMetrics(Collections.singletonList(metricConfig)).setDatasets(Collections.singletonList(datasetConfigDTO));
+ }
+
+ @Test
+ public void testSlaTranslation() throws Exception {
+ String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-1.yaml"), "UTF-8");
+ DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+ DetectionConfigDTO result = translator.translate();
+ YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-1.json"), YamlTranslationResult.class);
+ Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+ Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+ }
+
+ @Test
+ public void testDetectionAndSlaTranslation() throws Exception {
+ String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-2.yaml"), "UTF-8");
+ DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+ DetectionConfigDTO result = translator.translate();
+ YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-2.json"), YamlTranslationResult.class);
+ Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+ Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+ }
+
+ @Test
+ public void testMultipleDetectionFilterAndSlaTranslation() throws Exception {
+ String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-3.yaml"), "UTF-8");
+ DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+ DetectionConfigDTO result = translator.translate();
+ YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-3.json"), YamlTranslationResult.class);
+ Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+ Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+ }
+
+ @Test
+ public void testSlaTranslationWithSingleMetricEntityAlert() throws Exception {
+ String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-4.yaml"), "UTF-8");
+ DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+ DetectionConfigDTO result = translator.translate();
+ YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-4.json"), YamlTranslationResult.class);
+ Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+ Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+ }
+
+ @Test
+ public void testSlaTranslationWithMultiMetricEntityAlert() throws Exception {
+ String yamlConfig = IOUtils.toString(this.getClass().getResourceAsStream("sla-config-5.yaml"), "UTF-8");
+ DetectionConfigTranslator translator = new DetectionConfigTranslator(yamlConfig, this.provider);
+ DetectionConfigDTO result = translator.translate();
+ YamlTranslationResult expected = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("sla-config-translated-5.json"), YamlTranslationResult.class);
+ Assert.assertEquals(result.getDataQualityProperties(), expected.getDataQualityProperties());
+ Assert.assertEquals(result.getComponentSpecs(), expected.getComponents());
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlTranslationResult.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlTranslationResult.java
index 10055fa..3f7dfa4 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlTranslationResult.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlTranslationResult.java
@@ -24,6 +24,7 @@ import java.util.Map;
public class YamlTranslationResult {
private Map<String, Object> properties;
+ private Map<String, Object> dataQualityProperties;
private Map<String, Object> components;
private String cron;
@@ -35,6 +36,14 @@ public class YamlTranslationResult {
return properties;
}
+ public void setDataQualityProperties(Map<String, Object> dataQualityProperties) {
+ this.dataQualityProperties = dataQualityProperties;
+ }
+
+ public Map<String, Object> getDataQualityProperties() {
+ return dataQualityProperties;
+ }
+
public void setComponents(Map<String, Object> components) {
this.components = components;
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/content/templates/TestMetricAnomaliesContent.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/content/templates/TestMetricAnomaliesContent.java
index 5db1d3d..60d0d23 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/content/templates/TestMetricAnomaliesContent.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/content/templates/TestMetricAnomaliesContent.java
@@ -18,6 +18,7 @@ package org.apache.pinot.thirdeye.notification.content.templates;
import java.util.Properties;
import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
@@ -174,7 +175,8 @@ public class TestMetricAnomaliesContent {
new DateTime(2020, 1, 7, 17, 0, dateTimeZone).getMillis(),
TEST, TEST, 0.1, 1l, new DateTime(2020, 1, 7, 17, 1, dateTimeZone).getMillis());
anomaly.setDetectionConfigId(detectionConfigDTO.getId());
- anomaly.setType(AnomalyType.DATA_MISSING);
+ anomaly.setType(AnomalyType.DATA_SLA);
+ anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
anomaly.setMetricUrn("thirdeye:metric:3");
Map<String, String> props = new HashMap<>();
props.put("sla", "3_DAYS");
@@ -187,7 +189,8 @@ public class TestMetricAnomaliesContent {
new DateTime(2020, 1, 7, 5, 5, dateTimeZone).getMillis(),
TEST, TEST, 0.1, 1l, new DateTime(2020, 1, 7, 5, 6, dateTimeZone).getMillis());
anomaly.setDetectionConfigId(detectionConfigDTO.getId());
- anomaly.setType(AnomalyType.DATA_MISSING);
+ anomaly.setType(AnomalyType.DATA_SLA);
+ anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
anomaly.setMetricUrn("thirdeye:metric:3");
props = new HashMap<>();
props.put("datasetLastRefreshTime", "" + new DateTime(2020, 1, 4, 0, 0, dateTimeZone).getMillis());
@@ -201,7 +204,8 @@ public class TestMetricAnomaliesContent {
new DateTime(2020, 1, 1, 15, 0, dateTimeZone).getMillis(),
TEST, TEST, 0.1, 1l, new DateTime(2020, 1, 1, 16, 0, dateTimeZone).getMillis());
anomaly.setDetectionConfigId(detectionConfigDTO.getId());
- anomaly.setType(AnomalyType.DATA_MISSING);
+ anomaly.setType(AnomalyType.DATA_SLA);
+ anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
anomaly.setMetricUrn("thirdeye:metric:3");
props = new HashMap<>();
props.put("datasetLastRefreshTime", "" + new DateTime(2020, 1, 1, 10, 5, dateTimeZone).getMillis());
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/formatter/channels/TestJiraContentFormatter.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/formatter/channels/TestJiraContentFormatter.java
index d20e169..a5311c0 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/formatter/channels/TestJiraContentFormatter.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/notification/formatter/channels/TestJiraContentFormatter.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.pinot.thirdeye.anomaly.AnomalyType;
import org.apache.pinot.thirdeye.anomaly.ThirdEyeAnomalyConfiguration;
import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyResult;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
@@ -129,7 +130,8 @@ public class TestJiraContentFormatter {
anomalyResultDTO2.setDetectionConfigId(this.detectionConfigId);
anomalyResultDTO2.setCollection(COLLECTION_VALUE);
anomalyResultDTO2.setMetric(METRIC_VALUE);
- anomalyResultDTO2.setType(AnomalyType.DATA_MISSING);
+ anomalyResultDTO2.setType(AnomalyType.DATA_SLA);
+ anomalyResultDTO2.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
anomalyResultDTO2.setMetricUrn("thirdeye:metric:124");
Map<String, String> props = new HashMap<>();
props.put("sla", "2_HOURS");
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/tools/RunAdhocDatabaseQueriesTool.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/tools/RunAdhocDatabaseQueriesTool.java
index 777e7db..6931778 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/tools/RunAdhocDatabaseQueriesTool.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/tools/RunAdhocDatabaseQueriesTool.java
@@ -725,13 +725,13 @@ public class RunAdhocDatabaseQueriesTool {
public static void main(String[] args) throws Exception {
- File persistenceFile = new File(args[0]);
+ File persistenceFile = new File("/Users/akrai/persistence-linux.yml");
if (!persistenceFile.exists()) {
System.err.println("Missing file:" + persistenceFile);
System.exit(1);
}
RunAdhocDatabaseQueriesTool dq = new RunAdhocDatabaseQueriesTool(persistenceFile);
- dq.disableAllActiveDetections(Collections.singleton(142644400L));
+ dq.disableAllActiveDetections(Collections.singleton(160640739L));
LOG.info("DONE");
}
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-0.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-0.yaml
new file mode 100644
index 0000000..947ccd5
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-0.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: thirdeye-test
+dataset: thirdeye-test-dataset
+
+rules:
+- detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 0_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-1.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-1.yaml
new file mode 100644
index 0000000..192f6f1
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-1.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: thirdeye-test
+dataset: thirdeye-test-dataset
+
+rules:
+- detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 1_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-2.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-2.yaml
new file mode 100644
index 0000000..6c83a3e
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-2.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: thirdeye-test
+dataset: thirdeye-test-dataset
+
+rules:
+- detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 2_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-3.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-3.yaml
new file mode 100644
index 0000000..3bffe8d
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/dataquality/sla-config-3.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: thirdeye-test
+dataset: thirdeye-test-dataset
+
+rules:
+- detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 3_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-1.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-1.yaml
new file mode 100644
index 0000000..3d76605
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-1.yaml
@@ -0,0 +1,17 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: test_metric
+dataset: test_dataset
+
+rules:
+- detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 2_DAYS
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-2.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-2.yaml
new file mode 100644
index 0000000..5468efb
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-2.yaml
@@ -0,0 +1,30 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: test_metric
+dataset: test_dataset
+
+filters:
+ D1:
+ - v1
+ - v2
+ D2:
+ - v3
+dimensionExploration:
+ dimensions:
+ - D1
+ - D2
+ minContribution: 0.05
+
+rules:
+- detection:
+ - type: THRESHOLD
+ name: rule1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 2_DAYS
+
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-3.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-3.yaml
new file mode 100644
index 0000000..2c317c1
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-3.yaml
@@ -0,0 +1,65 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+metric: test_metric
+dataset: test_dataset
+
+filters:
+ D1:
+ - v1
+ - v2
+ D2:
+ - v3
+dimensionExploration:
+ dimensions:
+ - D1
+ - D2
+ minContribution: 0.05
+
+rules:
+- detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ filter:
+ - type: THRESHOLD_RULE_FILTER
+ name: thresholdFilter_1
+ params:
+ min: 50
+ - type: THRESHOLD_RULE_FILTER
+ name: thresholdFilter_2
+ params:
+ min: 50
+
+- detection:
+ - type: THRESHOLD
+ name: maxThreshold_3
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 2_DAYS
+
+- detection:
+ - type: THRESHOLD
+ name: maxThreshold_2
+ params:
+ max: 100
+ filter:
+ - type: THRESHOLD_RULE_FILTER
+ name: thresholdFilter_3
+ params:
+ min: 50
+
+merger:
+ maxGap: 0
+ maxDuration: 100000000
+
+grouper:
+- type: MOCK_GROUPER
+ name: test_grouper
+ params:
+ mockParam: 0.3
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-4.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-4.yaml
new file mode 100644
index 0000000..4876eb9
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-4.yaml
@@ -0,0 +1,33 @@
+detectionName: test_sla_alert
+description: My test sla alert
+
+type: COMPOSITE_ALERT
+cron: "0 0 14 * * ? *"
+
+alerts:
+- type: METRIC_ALERT
+ name: metric alert on test_metric
+ metric: test_metric
+ dataset: test_dataset
+ filters:
+ D1:
+ - v1
+ - v2
+ D2:
+ - v3
+ dimensionExploration:
+ dimensions:
+ - D1
+ - D2
+ minContribution: 0.05
+ rules:
+ - detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 2_DAYS
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-5.yaml b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-5.yaml
new file mode 100644
index 0000000..d738842
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-5.yaml
@@ -0,0 +1,90 @@
+detectionName: test_sla_alert
+description: My test sla alert
+type: COMPOSITE_ALERT
+cron: "0 0 14 * * ? *"
+alerts:
+- type: METRIC_ALERT
+ name: metric_alert_on_test_metric_1
+ metric: test_metric
+ dataset: test_dataset
+ filters:
+ D1:
+ - v1
+ - v2
+ D2:
+ - v3
+ dimensionExploration:
+ dimensions:
+ - D1
+ - D2
+ minContribution: 0.05
+ rules:
+ - detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ filter:
+ - type: THRESHOLD_RULE_FILTER
+ name: thresholdFilter_1
+ params:
+ min: 50
+ - type: THRESHOLD_RULE_FILTER
+ name: thresholdFilter_2
+ params:
+ min: 100
+ - detection:
+ - type: THRESHOLD
+ name: maxThreshold_3
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule3
+ params:
+ sla: 2_DAYS
+ - detection:
+ - type: THRESHOLD
+ name: maxThreshold_2
+ params:
+ max: 100
+
+- type: COMPOSITE_ALERT
+ name: composite_alert_on_entity
+ alerts:
+ - type: METRIC_ALERT
+ name: metric_alert_on_test_metric_2
+ metric: test_metric
+ dataset: test_dataset
+ rules:
+ - detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule1
+ params:
+ sla: 2_DAYS
+ - type: METRIC_ALERT
+ name: another_metric_alert_on_test_metric
+ metric: test_metric
+ dataset: test_dataset
+ rules:
+ - detection:
+ - type: THRESHOLD
+ name: maxThreshold_1
+ params:
+ max: 100
+ quality:
+ - type: DATA_SLA
+ name: slaRule2
+ params:
+ sla: 2_DAYS
+
+grouper:
+ - type: MOCK_GROUPER
+ name: test_grouper
+ params:
+ mockParam: 0.3
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-1.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-1.json
new file mode 100644
index 0000000..ddebd38
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-1.json
@@ -0,0 +1,23 @@
+{
+ "dataQualityProperties": {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+ "nested": [{
+ "qualityCheck": "$slaRule1:DATA_SLA",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+ "metricUrn": "thirdeye:metric:1",
+ "subEntityName": "test_sla_alert"
+ }]
+ },
+ "components": {
+ "slaRule1:DATA_SLA": {
+ "sla": "2_DAYS",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+ "metricUrn": "thirdeye:metric:1"
+ },
+ "maxThreshold_1:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1"
+ }
+ }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-2.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-2.json
new file mode 100644
index 0000000..59a0b31
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-2.json
@@ -0,0 +1,23 @@
+{
+ "dataQualityProperties": {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+ "nested": [{
+ "qualityCheck": "$slaRule1:DATA_SLA",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+ "subEntityName": "test_sla_alert"
+ }]
+ },
+ "components": {
+ "slaRule1:DATA_SLA": {
+ "sla": "2_DAYS",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "rule1:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ }
+ }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-3.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-3.json
new file mode 100644
index 0000000..e3c363b
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-3.json
@@ -0,0 +1,57 @@
+{
+ "dataQualityProperties": {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+ "maxGap": 0,
+ "maxDuration": 100000000,
+ "nested": [{
+ "qualityCheck": "$slaRule1:DATA_SLA",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+ "subEntityName": "test_sla_alert",
+ "maxGap": 0,
+ "maxDuration": 100000000
+ }]
+ },
+ "components": {
+ "slaRule1:DATA_SLA": {
+ "sla": "2_DAYS",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "maxThreshold_1:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "maxThreshold_2:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "maxThreshold_3:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "thresholdFilter_1:THRESHOLD_RULE_FILTER": {
+ "min": 50,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "thresholdFilter_2:THRESHOLD_RULE_FILTER": {
+ "min": 50,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "thresholdFilter_3:THRESHOLD_RULE_FILTER": {
+ "min": 50,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "test_grouper:MOCK_GROUPER": {
+ "mockParam": 0.3,
+ "className": "org.apache.pinot.thirdeye.detection.components.MockGrouper",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ }
+ }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-4.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-4.json
new file mode 100644
index 0000000..6a788dc
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-4.json
@@ -0,0 +1,26 @@
+{
+ "dataQualityProperties": {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+ "nested": [{
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+ "nested": [{
+ "qualityCheck": "$slaRule1:DATA_SLA",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+ "subEntityName": "metric alert on test_metric"
+ }]
+ }]
+ },
+ "components": {
+ "slaRule1:DATA_SLA": {
+ "sla": "2_DAYS",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "maxThreshold_1:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ }
+ }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-5.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-5.json
new file mode 100644
index 0000000..58149a0
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/translator/sla-config-translated-5.json
@@ -0,0 +1,103 @@
+{
+ "dataQualityProperties": {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+ "nested": [
+ {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper",
+ "nested": [
+ {
+ "grouper": "$test_grouper:MOCK_GROUPER",
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.GrouperWrapper",
+ "subEntityName": "test_sla_alert",
+ "nested": [
+ {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+ "nested": [
+ {
+ "qualityCheck": "$slaRule3:DATA_SLA",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3",
+ "subEntityName": "metric_alert_on_test_metric_1"
+ }
+ ]
+ },
+ {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper",
+ "nested": [
+ {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+ "nested": [
+ {
+ "qualityCheck": "$slaRule1:DATA_SLA",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+ "metricUrn": "thirdeye:metric:1",
+ "subEntityName": "metric_alert_on_test_metric_2"
+ }
+ ]
+ },
+ {
+ "className": "org.apache.pinot.thirdeye.detection.wrapper.DataQualityMergeWrapper",
+ "nested": [
+ {
+ "qualityCheck": "$slaRule2:DATA_SLA",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.wrapper.DataSlaWrapper",
+ "metricUrn": "thirdeye:metric:1",
+ "subEntityName": "another_metric_alert_on_test_metric"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ "components": {
+ "slaRule1:DATA_SLA": {
+ "sla": "2_DAYS",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+ "metricUrn": "thirdeye:metric:1"
+ },
+ "slaRule2:DATA_SLA": {
+ "sla": "2_DAYS",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+ "metricUrn": "thirdeye:metric:1"
+ },
+ "slaRule3:DATA_SLA": {
+ "sla": "2_DAYS",
+ "className": "org.apache.pinot.thirdeye.detection.dataquality.components.DataSlaQualityChecker",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "maxThreshold_1:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1"
+ },
+ "maxThreshold_2:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "maxThreshold_3:THRESHOLD": {
+ "max": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "thresholdFilter_1:THRESHOLD_RULE_FILTER": {
+ "min": 50,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "thresholdFilter_2:THRESHOLD_RULE_FILTER": {
+ "min": 100,
+ "className": "org.apache.pinot.thirdeye.detection.components.ThresholdRuleAnomalyFilter",
+ "metricUrn": "thirdeye:metric:1:D1%3Dv1:D1%3Dv2:D2%3Dv3"
+ },
+ "test_grouper:MOCK_GROUPER": {
+ "mockParam": 0.3,
+ "className": "org.apache.pinot.thirdeye.detection.components.MockGrouper"
+ }
+ }
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/test-jira-anomalies-template.ftl b/thirdeye/thirdeye-pinot/src/test/resources/test-jira-anomalies-template.ftl
index 9453cc8..f12b9bb 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/test-jira-anomalies-template.ftl
+++ b/thirdeye/thirdeye-pinot/src/test/resources/test-jira-anomalies-template.ftl
@@ -8,8 +8,8 @@ ThirdEye has detected [*2 anomalies*|test/app/#/anomalies?anomalyIds=4,5] on the
*Description:*
||Start||Duration||Type||Dimensions||Current||Predicted||Change||
-|[Jan 01, 10:05 PDT|test/app/#/rootcause?anomalyId=4]|24 hours |Deviation |key:value\\|_0_|_0_ |_+100.00 %_|
-|[Jan 01, 10:05 PDT|test/app/#/rootcause?anomalyId=5]|12.75 hours|Data Missing|-|_0_|_2_HOURS_|_+12 hours & 45mins_|
+|[Jan 01, 10:05 PDT|test/app/#/rootcause?anomalyId=4]|24 hours |Deviation |key:value\\|_0_|_0_ |_+100.00 %_|
+|[Jan 01, 10:05 PDT|test/app/#/rootcause?anomalyId=5]|12.75 hours|SLA Violation|-|_0_|_2_HOURS_|_+12 hours & 45mins_|
=======================================================================================
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/test-metric-anomalies-template.html b/thirdeye/thirdeye-pinot/src/test/resources/test-metric-anomalies-template.html
index 9f66b1d..530b07b 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/test-metric-anomalies-template.html
+++ b/thirdeye/thirdeye-pinot/src/test/resources/test-metric-anomalies-template.html
@@ -90,7 +90,7 @@
<a style="font-weight: bold; text-decoration: none; font-size:14px; line-height:20px; color: #0073B1;" href="http://localhost:8080/dashboard/app/#/rootcause?anomalyId=6"
target="_blank">(view)</a>
</td>
- <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">Data Missing</td>
+ <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">SLA Violation</td>
<td style="word-break: break-all; width: 200px; padding-right:4px 20px 4px 0"></td>
<td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">0</td>
<td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">
@@ -105,7 +105,7 @@
<a style="font-weight: bold; text-decoration: none; font-size:14px; line-height:20px; color: #0073B1;" href="http://localhost:8080/dashboard/app/#/rootcause?anomalyId=7"
target="_blank">(view)</a>
</td>
- <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">Data Missing</td>
+ <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">SLA Violation</td>
<td style="word-break: break-all; width: 200px; padding-right:4px 20px 4px 0"></td>
<td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">0</td>
<td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">
@@ -121,7 +121,7 @@
<a style="font-weight: bold; text-decoration: none; font-size:14px; line-height:20px; color: #0073B1;" href="http://localhost:8080/dashboard/app/#/rootcause?anomalyId=8"
target="_blank">(view)</a>
</td>
- <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">Data Missing</td>
+ <td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">SLA Violation</td>
<td style="word-break: break-all; width: 200px; padding-right:4px 20px 4px 0"></td>
<td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">0</td>
<td style="color: rgba(0,0,0,0.9); font-size:14px; line-height:20px; text-align:center;">
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org