You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/05/15 23:34:52 UTC

[GitHub] [incubator-pinot] akshayrai opened a new pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

akshayrai opened a new pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398


   **Overall Updates:**
   * Added ability to configure Data Quality (DQ) alerts from Yaml. See a sample below.
   * DQ pipeline leverages & follows the existing detection merging logic pattern.
   * Refactored code to modularize & reuse the detection translators and extend them to data quality translators.
   * Updated the DATA_QUALITY Task and Pipeline Runner.
   * Added following metrics to track - # DQ Tasks, # DQ Success Tasks & #DQ Fail Tasks
   
   Doc Link: To Be Updated
   
   **Tests:**
   * Added a bunch of unit testing around translating DQ yaml to internal json representation
   * Updated a bunch of unit testing to validate the Data SLA checker.
   * Tested the Data Quality/sla pipeline by deploying it locally.
   
   **Example:**
   ```
   ...
   rules:
   - quality:
     - type: DATA_SLA
       name: slaRule1
       params:
         sla: 1_DAYS
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] akshayrai edited a comment on pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
akshayrai edited a comment on pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#issuecomment-631661326


   > Can we treat the quality rule the same as other rules? Then we can remove the sub level for "quality". @akshayrai
   
   @xiaohui-sun since the detection pipeline and the quality pipeline are treated as 2 logically separate & independent pipelines (running separately), we added the quality section to be able to parse them into separate  pipeline configs.
   
   Overall, I assume the flow will look something like this in order if we tie them up:
   quality rules -> detection rules -> filter rules -> tagging rules etc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] akshayrai commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
akshayrai commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428856659



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java
##########
@@ -79,28 +81,45 @@ public void run() {
 
       // add or update
       for (DetectionConfigDTO config : configs) {
-        JobKey key = new JobKey(getJobKey(config.getId()), TaskConstants.TaskType.DETECTION.toString());
         if (!config.isActive()) {
-          LOG.debug("Detection config " + key + " is inactive. Skipping.");
+          LOG.debug("Detection config " + config.getId() + " is inactive. Skipping.");
           continue;
         }
-        if (DetectionUtils.isDataAvailabilityCheckEnabled(config)) {
-          LOG.debug("Detection config " + key + " is enabled for data availability scheduling. Skipping.");
+        if (config.isDataAvailabilitySchedule()) {
+          LOG.debug("Detection config " + config.getId() + " is enabled for data availability scheduling. Skipping.");
           continue;
         }
+
         try {
-          if (scheduler.checkExists(key)) {
-            LOG.info("Detection config  " + key.getName() + " is already scheduled");
-            if (isJobUpdated(config, key)) {
-              // restart job
-              stopJob(key);
-              startJob(config, key);
+          // Schedule detection jobs
+          JobKey detectionJobKey = new JobKey(getJobKey(config.getId(), TaskConstants.TaskType.DETECTION),
+              QUARTZ_DETECTION_GROUPER);
+          JobDetail detectionJob = JobBuilder.newJob(DetectionPipelineJob.class).withIdentity(detectionJobKey).build();
+          if (scheduler.checkExists(detectionJobKey)) {
+            LOG.info("Detection config " + detectionJobKey.getName() + " is already scheduled for detection");
+            if (isJobUpdated(config, detectionJobKey)) {
+              restartJob(config, detectionJob);
+            }
+          } else {
+            startJob(config, detectionJob);
+          }
+
+          // Schedule data quality jobs
+          JobKey dataQualityJobKey = new JobKey(getJobKey(config.getId(), TaskConstants.TaskType.DATA_QUALITY),
+              QUARTZ_DETECTION_GROUPER);
+          JobDetail dataQualityJob = JobBuilder.newJob(DataQualityPipelineJob.class).withIdentity(dataQualityJobKey).build();

Review comment:
       By design the detection config supports only 1 cron. If we need a different cron, then we can setup another alert.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/DataQualityMergeWrapper.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.wrapper;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
+import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+
+
+/**
+ * The Data Quality Merge Wrapper. This merge wrapper is specifically designed keeping the sla anomalies in mind.
+ * We might need to revisit this when more data quality rules are added. Fundamentally, the data sla anomalies are never
+ * merged as we want to keep re-notifying users if the sla is missed after every detection. This merger will ensure no
+ * duplicate sla anomalies are created if the detection runs more frequently and will serve as a placeholder for future
+ * merging logic.
+ */
+public class DataQualityMergeWrapper extends MergeWrapper {
+  private static final String PROP_GROUP_KEY = "groupKey";
+
+  public DataQualityMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
+    super(provider, config, startTime, endTime);
+  }
+
+  @Override
+  protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    AnomalySlice effectiveSlice = this.slice.withDetectionId(this.config.getId())
+        .withStart(this.getStartTime(generated) - this.maxGap - 1)
+        .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+    Collection<MergedAnomalyResultDTO> anomalies =
+        this.provider.fetchAnomalies(Collections.singleton(effectiveSlice)).get(effectiveSlice);
+
+    return anomalies.stream().filter(anomaly -> !anomaly.isChild()).collect(Collectors.toList());

Review comment:
       The merger anyways has an anomaly key but makes sense to logically filter out. Updated here and the ChildKeepingMerger.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionConfigTranslatorBuilder.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.yaml.translator.builder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.GrouperWrapper;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionMetricAttributeHolder;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
+
+
+/**
+ * This is the root of the detection config translator builder. Other translators
+ * extend from this class.
+ */
+public abstract class DetectionConfigTranslatorBuilder {
+
+  public static final String PROP_SUB_ENTITY_NAME = "subEntityName";
+  public static final String PROP_DIMENSION_EXPLORATION = "dimensionExploration";
+  public static final String PROP_FILTERS = "filters";
+
+  static final String PROP_DETECTION = "detection";
+  static final String PROP_FILTER = "filter";
+  static final String PROP_QUALITY = "quality";
+  static final String PROP_TYPE = "type";
+  static final String PROP_CLASS_NAME = "className";
+  static final String PROP_PARAMS = "params";
+  static final String PROP_METRIC_URN = "metricUrn";
+  static final String PROP_DIMENSION_FILTER_METRIC = "dimensionFilterMetric";
+  static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
+  static final String PROP_RULES = "rules";
+  static final String PROP_GROUPER = "grouper";
+  static final String PROP_NESTED = "nested";
+  static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
+  static final String PROP_DETECTOR = "detector";
+  static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
+  static final String PROP_WINDOW_DELAY = "windowDelay";
+  static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+  static final String PROP_WINDOW_SIZE = "windowSize";
+  static final String PROP_WINDOW_UNIT = "windowUnit";
+  static final String PROP_FREQUENCY = "frequency";
+  static final String PROP_MERGER = "merger";
+  static final String PROP_TIMEZONE = "timezone";
+  static final String PROP_NAME = "name";
+
+  static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = "RULE_BASELINE";
+  static final String PROP_BUCKET_PERIOD = "bucketPeriod";
+  static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
+
+  static final String PROP_ALERTS = "alerts";
+  static final String COMPOSITE_ALERT = "COMPOSITE_ALERT";
+
+  final DetectionMetricAttributeHolder metricAttributesMap;
+  final DataProvider dataProvider;
+  static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
+
+  DetectionConfigTranslatorBuilder(DetectionMetricAttributeHolder metricAttributesMap, DataProvider dataProvider) {

Review comment:
       Renamed them to `...PropertiesBuilder`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] xiaohui-sun commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
xiaohui-sun commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428409078



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.components;
+
+import com.google.common.collect.ArrayListMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.PresentationOption;
+import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs data sla checks for the window and generates DATA_MISSING anomalies.
+ */
+@Components(title = "Data Sla Quality Checker",
+    type = "DATA_SLA",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Checks if data is missing or not based on the configured sla",
+    presentation = {
+        @PresentationOption(name = "data sla", template = "is ${sla}")
+    },
+    params = {
+        @Param(name = "sla", placeholder = "value")
+    })
+public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class);
+
+  private String sla;
+  private InputDataFetcher dataFetcher;
+  private final String DEFAULT_DATA_SLA = "3_DAYS";
+
+  @Override
+  public DetectionResult runDetection(Interval window, String metricUrn) {
+    return DetectionResult.from(runSLACheck(MetricEntity.fromURN(metricUrn), window));
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    return TimeSeries.empty();
+  }
+
+  @Override
+  public void init(DataSlaQualityCheckerSpec spec, InputDataFetcher dataFetcher) {
+    this.sla = spec.getSla();
+    this.dataFetcher = dataFetcher;
+  }
+
+  /**
+   * Runs the data sla check for the window on the given metric
+   */
+  private List<MergedAnomalyResultDTO> runSLACheck(MetricEntity me, Interval window) {
+    List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+
+    // We want to measure the overall dataset availability (filters can be ignored)
+    long startTime = window.getStart().getMillis();
+    long endTime = window.getEnd().getMillis();
+    MetricSlice metricSlice = MetricSlice.from(me.getId(), startTime, endTime, ArrayListMultimap.<String, String>create());
+    InputData data = this.dataFetcher.fetchData(new InputDataSpec()
+        .withTimeseriesSlices(Collections.singletonList(metricSlice))
+        .withMetricIdsForDataset(Collections.singletonList(me.getId()))
+        .withMetricIds(Collections.singletonList(me.getId())));
+    DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+
+    try {
+      long datasetLastRefreshTime = datasetConfig.getLastRefreshTime();
+      if (datasetLastRefreshTime <= 0) {
+        // no availability event -> assume we have processed data till the current detection start
+        datasetLastRefreshTime = startTime - 1;
+      }
+
+      MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime);
+      if (isMissingData(datasetLastRefreshTime, startTime)) {
+        // Double check with data source as 2 things are possible.
+        // 1. This dataset/source may not support availability events
+        // 2. The data availability event pipeline has some issue.
+
+        DataFrame dataFrame = data.getTimeseries().get(metricSlice);
+        if (dataFrame == null || dataFrame.isEmpty()) {
+          // no data
+          if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+            anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+          }
+        } else {
+          datasetLastRefreshTime = dataFrame.getDoubles("timestamp").max().longValue();
+          if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) {
+            if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+              slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime);
+              anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+            }
+          }
+        }
+      } else if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) {
+        // Optimize for the common case - the common case is that the data availability events are arriving
+        // correctly and we need not re-fetch the data to double check.
+        if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+          anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+        }
+      }
+    } catch (Exception e) {
+      LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e);
+    }

Review comment:
       Thanks. That looks much more clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] akshayrai commented on pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
akshayrai commented on pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#issuecomment-631661326


   > Can we treat the quality rule the same as other rules? Then we can remove the sub level for "quality". @akshayrai
   
   @xiaohui-sun since the detection pipeline and the quality pipeline are treated as 2 logically separate & independent pipelines (running separately), we added the quality section to be able to parse them into separate  pipeline configs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] akshayrai commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
akshayrai commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428922121



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.components;
+
+import com.google.common.collect.ArrayListMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.PresentationOption;
+import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs data sla checks for the window and generates DATA_SLA anomalies.
+ *
+ * Data SLA is verified based on the following information.
+ * a. The dataset refresh timestamp updated by the event based data availability pipeline (if applicable).
+ * b. Otherwise, we will query the data source and run sla checks.
+ */
+@Components(title = "Data Sla Quality Checker",
+    type = "DATA_SLA",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Checks if data is missing or not based on the configured sla",
+    presentation = {
+        @PresentationOption(name = "data sla", template = "is ${sla}")
+    },
+    params = {
+        @Param(name = "sla", placeholder = "value")
+    })
+public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class);
+
+  private String sla;
+  private InputDataFetcher dataFetcher;
+  private final String DEFAULT_DATA_SLA = "3_DAYS";

Review comment:
       I kept it this way as the sla parameter is also configured by the user in the same format in their yaml config.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] akshayrai edited a comment on pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
akshayrai edited a comment on pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#issuecomment-631661326


   > Can we treat the quality rule the same as other rules? Then we can remove the sub level for "quality". @akshayrai
   
   @xiaohui-sun since the detection pipeline and the quality pipeline are treated as 2 logically separate & independent pipelines (running separately), we added the quality section to be able to parse them into separate  pipeline configs.
   
   Overall, I assume the flow will look something like this in order:
   quality rules -> detection rules -> filter rules -> tagging rules etc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jihaozh commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
jihaozh commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428391511



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/scheduler/DetectionCronScheduler.java
##########
@@ -79,28 +81,45 @@ public void run() {
 
       // add or update
       for (DetectionConfigDTO config : configs) {
-        JobKey key = new JobKey(getJobKey(config.getId()), TaskConstants.TaskType.DETECTION.toString());
         if (!config.isActive()) {
-          LOG.debug("Detection config " + key + " is inactive. Skipping.");
+          LOG.debug("Detection config " + config.getId() + " is inactive. Skipping.");
           continue;
         }
-        if (DetectionUtils.isDataAvailabilityCheckEnabled(config)) {
-          LOG.debug("Detection config " + key + " is enabled for data availability scheduling. Skipping.");
+        if (config.isDataAvailabilitySchedule()) {
+          LOG.debug("Detection config " + config.getId() + " is enabled for data availability scheduling. Skipping.");
           continue;
         }
+
         try {
-          if (scheduler.checkExists(key)) {
-            LOG.info("Detection config  " + key.getName() + " is already scheduled");
-            if (isJobUpdated(config, key)) {
-              // restart job
-              stopJob(key);
-              startJob(config, key);
+          // Schedule detection jobs
+          JobKey detectionJobKey = new JobKey(getJobKey(config.getId(), TaskConstants.TaskType.DETECTION),
+              QUARTZ_DETECTION_GROUPER);
+          JobDetail detectionJob = JobBuilder.newJob(DetectionPipelineJob.class).withIdentity(detectionJobKey).build();
+          if (scheduler.checkExists(detectionJobKey)) {
+            LOG.info("Detection config " + detectionJobKey.getName() + " is already scheduled for detection");
+            if (isJobUpdated(config, detectionJobKey)) {
+              restartJob(config, detectionJob);
+            }
+          } else {
+            startJob(config, detectionJob);
+          }
+
+          // Schedule data quality jobs
+          JobKey dataQualityJobKey = new JobKey(getJobKey(config.getId(), TaskConstants.TaskType.DATA_QUALITY),
+              QUARTZ_DETECTION_GROUPER);
+          JobDetail dataQualityJob = JobBuilder.newJob(DataQualityPipelineJob.class).withIdentity(dataQualityJobKey).build();

Review comment:
       will there be a case that the detection and data quality check has different corn?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] akshayrai merged pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
akshayrai merged pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] xiaohui-sun commented on pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
xiaohui-sun commented on pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#issuecomment-630590642


   Can we treat the quality rule the same as other rules? Then we can remove the sub level for "quality". @akshayrai 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jihaozh commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
jihaozh commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428388089



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/DataQualityMergeWrapper.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.wrapper;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
+import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+
+
+/**
+ * The Data Quality Merge Wrapper. This merge wrapper is specifically designed keeping the sla anomalies in mind.
+ * We might need to revisit this when more data quality rules are added. Fundamentally, the data sla anomalies are never
+ * merged as we want to keep re-notifying users if the sla is missed after every detection. This merger will ensure no
+ * duplicate sla anomalies are created if the detection runs more frequently and will serve as a placeholder for future
+ * merging logic.
+ */
+public class DataQualityMergeWrapper extends MergeWrapper {
+  private static final String PROP_GROUP_KEY = "groupKey";
+
+  public DataQualityMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
+    super(provider, config, startTime, endTime);
+  }
+
+  @Override
+  protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    AnomalySlice effectiveSlice = this.slice.withDetectionId(this.config.getId())
+        .withStart(this.getStartTime(generated) - this.maxGap - 1)
+        .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+    Collection<MergedAnomalyResultDTO> anomalies =
+        this.provider.fetchAnomalies(Collections.singleton(effectiveSlice)).get(effectiveSlice);
+
+    return anomalies.stream().filter(anomaly -> !anomaly.isChild()).collect(Collectors.toList());

Review comment:
       should this merger only retrieve and handle the anomalies with SLA type?

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/builder/DetectionConfigTranslatorBuilder.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.yaml.translator.builder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.EntityAnomalyMergeWrapper;
+import org.apache.pinot.thirdeye.detection.wrapper.GrouperWrapper;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionMetricAttributeHolder;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+
+import static org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner.*;
+
+
+/**
+ * This is the root of the detection config translator builder. Other translators
+ * extend from this class.
+ */
+public abstract class DetectionConfigTranslatorBuilder {
+
+  public static final String PROP_SUB_ENTITY_NAME = "subEntityName";
+  public static final String PROP_DIMENSION_EXPLORATION = "dimensionExploration";
+  public static final String PROP_FILTERS = "filters";
+
+  static final String PROP_DETECTION = "detection";
+  static final String PROP_FILTER = "filter";
+  static final String PROP_QUALITY = "quality";
+  static final String PROP_TYPE = "type";
+  static final String PROP_CLASS_NAME = "className";
+  static final String PROP_PARAMS = "params";
+  static final String PROP_METRIC_URN = "metricUrn";
+  static final String PROP_DIMENSION_FILTER_METRIC = "dimensionFilterMetric";
+  static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
+  static final String PROP_RULES = "rules";
+  static final String PROP_GROUPER = "grouper";
+  static final String PROP_NESTED = "nested";
+  static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
+  static final String PROP_DETECTOR = "detector";
+  static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
+  static final String PROP_WINDOW_DELAY = "windowDelay";
+  static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+  static final String PROP_WINDOW_SIZE = "windowSize";
+  static final String PROP_WINDOW_UNIT = "windowUnit";
+  static final String PROP_FREQUENCY = "frequency";
+  static final String PROP_MERGER = "merger";
+  static final String PROP_TIMEZONE = "timezone";
+  static final String PROP_NAME = "name";
+
+  static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = "RULE_BASELINE";
+  static final String PROP_BUCKET_PERIOD = "bucketPeriod";
+  static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
+
+  static final String PROP_ALERTS = "alerts";
+  static final String COMPOSITE_ALERT = "COMPOSITE_ALERT";
+
+  final DetectionMetricAttributeHolder metricAttributesMap;
+  final DataProvider dataProvider;
+  static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
+
+  DetectionConfigTranslatorBuilder(DetectionMetricAttributeHolder metricAttributesMap, DataProvider dataProvider) {

Review comment:
       nit: Why it's called `DetectionConfigTranslatorBuilder` instead of `DetectionConfigBuilder` or `DetectionPropertyBuilder`. Because it seems the result it produces is a config property, not the translator.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] akshayrai commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
akshayrai commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428376797



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.spec;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.detection.spec.AbstractSpec;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DataSlaQualityCheckerSpec extends AbstractSpec {
+  private String sla = "1_DAYS";

Review comment:
       updated

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.components;
+
+import com.google.common.collect.ArrayListMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.PresentationOption;
+import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs data sla checks for the window and generates DATA_MISSING anomalies.
+ */
+@Components(title = "Data Sla Quality Checker",
+    type = "DATA_SLA",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Checks if data is missing or not based on the configured sla",
+    presentation = {
+        @PresentationOption(name = "data sla", template = "is ${sla}")
+    },
+    params = {
+        @Param(name = "sla", placeholder = "value")
+    })
+public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class);
+
+  private String sla;
+  private InputDataFetcher dataFetcher;
+  private final String DEFAULT_DATA_SLA = "3_DAYS";
+
+  @Override
+  public DetectionResult runDetection(Interval window, String metricUrn) {
+    return DetectionResult.from(runSLACheck(MetricEntity.fromURN(metricUrn), window));
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    return TimeSeries.empty();
+  }
+
+  @Override
+  public void init(DataSlaQualityCheckerSpec spec, InputDataFetcher dataFetcher) {
+    this.sla = spec.getSla();
+    this.dataFetcher = dataFetcher;
+  }
+
+  /**
+   * Runs the data sla check for the window on the given metric
+   */
+  private List<MergedAnomalyResultDTO> runSLACheck(MetricEntity me, Interval window) {
+    List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+
+    // We want to measure the overall dataset availability (filters can be ignored)
+    long startTime = window.getStart().getMillis();
+    long endTime = window.getEnd().getMillis();
+    MetricSlice metricSlice = MetricSlice.from(me.getId(), startTime, endTime, ArrayListMultimap.<String, String>create());
+    InputData data = this.dataFetcher.fetchData(new InputDataSpec()
+        .withTimeseriesSlices(Collections.singletonList(metricSlice))
+        .withMetricIdsForDataset(Collections.singletonList(me.getId()))
+        .withMetricIds(Collections.singletonList(me.getId())));
+    DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+
+    try {
+      long datasetLastRefreshTime = datasetConfig.getLastRefreshTime();
+      if (datasetLastRefreshTime <= 0) {
+        // no availability event -> assume we have processed data till the current detection start
+        datasetLastRefreshTime = startTime - 1;
+      }
+
+      MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime);
+      if (isMissingData(datasetLastRefreshTime, startTime)) {
+        // Double check with data source as 2 things are possible.
+        // 1. This dataset/source may not support availability events
+        // 2. The data availability event pipeline has some issue.
+
+        DataFrame dataFrame = data.getTimeseries().get(metricSlice);
+        if (dataFrame == null || dataFrame.isEmpty()) {
+          // no data
+          if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+            anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+          }
+        } else {
+          datasetLastRefreshTime = dataFrame.getDoubles("timestamp").max().longValue();
+          if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) {
+            if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+              slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime);
+              anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+            }
+          }
+        }
+      } else if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) {
+        // Optimize for the common case - the common case is that the data availability events are arriving
+        // correctly and we need not re-fetch the data to double check.
+        if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+          anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+        }
+      }
+    } catch (Exception e) {
+      LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e);
+    }

Review comment:
       Not quite! We do have that core logic but the complexity comes while dealing with "partial data" and also trying to optimize for the common case at the same time (reduce unnecessary querying of data source).
   
   I have updated this section of code to make it more readable. Please review. Also, this section of code comes from the previous PR (listed here due to refactoring).
   
   
   
   
   
   

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
##########
@@ -122,9 +122,10 @@ public void run() {
             long endtime = System.currentTimeMillis();
             createDetectionTask(detectionConfig, endtime);
 
-            if (DetectionUtils.isDataAvailabilityCheckEnabled(detectionConfig)) {
-              createDataSLACheckTask(detectionConfig, endtime);
-              LOG.info("Scheduling a task for data availability {} due to the fallback mechanism.", detectionConfigId);
+            if (DetectionUtils.isDataQualityCheckEnabled(detectionConfig)) {

Review comment:
       Here are the conventions I am following.
   * "Data Availability" refers to the event trigger pipeline and refers to dataset availability. This is not referred to in the context for the alert.
   * "Detection" refers to all the time-series detection rules like ALGORITHM, THRESHOLD, PERCENTAGE etc.
   * "Data Quality" refers to all the quality rules like DATA_SLA, DATA_COMPLETENESS, etc.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
##########
@@ -26,14 +26,16 @@
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
 import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.TaskUtils;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DetectionDataSLAJob implements Job {
-  private static final Logger LOG = LoggerFactory.getLogger(DetectionDataSLAJob.class);
+public class DataQualityPipelineJob implements Job {

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] xiaohui-sun commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
xiaohui-sun commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r427627733



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
##########
@@ -26,14 +26,16 @@
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
 import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.TaskUtils;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DetectionDataSLAJob implements Job {
-  private static final Logger LOG = LoggerFactory.getLogger(DetectionDataSLAJob.class);
+public class DataQualityPipelineJob implements Job {

Review comment:
       Add some documents on public class.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
##########
@@ -122,9 +122,10 @@ public void run() {
             long endtime = System.currentTimeMillis();
             createDetectionTask(detectionConfig, endtime);
 
-            if (DetectionUtils.isDataAvailabilityCheckEnabled(detectionConfig)) {
-              createDataSLACheckTask(detectionConfig, endtime);
-              LOG.info("Scheduling a task for data availability {} due to the fallback mechanism.", detectionConfigId);
+            if (DetectionUtils.isDataQualityCheckEnabled(detectionConfig)) {

Review comment:
       We need to have a unified naming convention for this alert.
   I think DQ is too broad. Availability is overloaded as it is used for triggering as well.
   Let's discuss.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.spec;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.detection.spec.AbstractSpec;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DataSlaQualityCheckerSpec extends AbstractSpec {
+  private String sla = "1_DAYS";

Review comment:
       Why it is 1 day here instead of 3 days?

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.components;
+
+import com.google.common.collect.ArrayListMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.PresentationOption;
+import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs data sla checks for the window and generates DATA_MISSING anomalies.
+ */
+@Components(title = "Data Sla Quality Checker",
+    type = "DATA_SLA",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Checks if data is missing or not based on the configured sla",
+    presentation = {
+        @PresentationOption(name = "data sla", template = "is ${sla}")
+    },
+    params = {
+        @Param(name = "sla", placeholder = "value")
+    })
+public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class);
+
+  private String sla;
+  private InputDataFetcher dataFetcher;
+  private final String DEFAULT_DATA_SLA = "3_DAYS";
+
+  @Override
+  public DetectionResult runDetection(Interval window, String metricUrn) {
+    return DetectionResult.from(runSLACheck(MetricEntity.fromURN(metricUrn), window));
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    return TimeSeries.empty();
+  }
+
+  @Override
+  public void init(DataSlaQualityCheckerSpec spec, InputDataFetcher dataFetcher) {
+    this.sla = spec.getSla();
+    this.dataFetcher = dataFetcher;
+  }
+
+  /**
+   * Runs the data sla check for the window on the given metric
+   */
+  private List<MergedAnomalyResultDTO> runSLACheck(MetricEntity me, Interval window) {
+    List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+
+    // We want to measure the overall dataset availability (filters can be ignored)
+    long startTime = window.getStart().getMillis();
+    long endTime = window.getEnd().getMillis();
+    MetricSlice metricSlice = MetricSlice.from(me.getId(), startTime, endTime, ArrayListMultimap.<String, String>create());
+    InputData data = this.dataFetcher.fetchData(new InputDataSpec()
+        .withTimeseriesSlices(Collections.singletonList(metricSlice))
+        .withMetricIdsForDataset(Collections.singletonList(me.getId()))
+        .withMetricIds(Collections.singletonList(me.getId())));
+    DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+
+    try {
+      long datasetLastRefreshTime = datasetConfig.getLastRefreshTime();
+      if (datasetLastRefreshTime <= 0) {
+        // no availability event -> assume we have processed data till the current detection start
+        datasetLastRefreshTime = startTime - 1;
+      }
+
+      MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime);
+      if (isMissingData(datasetLastRefreshTime, startTime)) {
+        // Double check with data source as 2 things are possible.
+        // 1. This dataset/source may not support availability events
+        // 2. The data availability event pipeline has some issue.
+
+        DataFrame dataFrame = data.getTimeseries().get(metricSlice);
+        if (dataFrame == null || dataFrame.isEmpty()) {
+          // no data
+          if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+            anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+          }
+        } else {
+          datasetLastRefreshTime = dataFrame.getDoubles("timestamp").max().longValue();
+          if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) {
+            if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+              slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime);
+              anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+            }
+          }
+        }
+      } else if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) {
+        // Optimize for the common case - the common case is that the data availability events are arriving
+        // correctly and we need not re-fetch the data to double check.
+        if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+          anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+        }
+      }
+    } catch (Exception e) {
+      LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e);
+    }

Review comment:
       Why the logic is so complicated?
   Can we just compare the data last time vs the expected refresh time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] vincentchenjl commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
vincentchenjl commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428858170



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.components;
+
+import com.google.common.collect.ArrayListMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.PresentationOption;
+import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs data sla checks for the window and generates DATA_SLA anomalies.
+ *
+ * Data SLA is verified based on the following information.
+ * a. The dataset refresh timestamp updated by the event based data availability pipeline (if applicable).
+ * b. Otherwise, we will query the data source and run sla checks.
+ */
+@Components(title = "Data Sla Quality Checker",
+    type = "DATA_SLA",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Checks if data is missing or not based on the configured sla",
+    presentation = {
+        @PresentationOption(name = "data sla", template = "is ${sla}")
+    },
+    params = {
+        @Param(name = "sla", placeholder = "value")
+    })
+public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class);
+
+  private String sla;
+  private InputDataFetcher dataFetcher;
+  private final String DEFAULT_DATA_SLA = "3_DAYS";

Review comment:
       Should we use Duration or TimeGranularity instead of String, since we are converting it anyway?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] xiaohui-sun commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

Posted by GitBox <gi...@apache.org>.
xiaohui-sun commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r428408973



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
##########
@@ -122,9 +122,10 @@ public void run() {
             long endtime = System.currentTimeMillis();
             createDetectionTask(detectionConfig, endtime);
 
-            if (DetectionUtils.isDataAvailabilityCheckEnabled(detectionConfig)) {
-              createDataSLACheckTask(detectionConfig, endtime);
-              LOG.info("Scheduling a task for data availability {} due to the fallback mechanism.", detectionConfigId);
+            if (DetectionUtils.isDataQualityCheckEnabled(detectionConfig)) {

Review comment:
       That makes sense. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org