You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/05/20 17:20:50 UTC

[GitHub] [incubator-pinot] xiaohui-sun commented on a change in pull request #5398: [TE] Added generic support for configuring Data Quality rules like SLA from Yaml

xiaohui-sun commented on a change in pull request #5398:
URL: https://github.com/apache/incubator-pinot/pull/5398#discussion_r427627733



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityPipelineJob.java
##########
@@ -26,14 +26,16 @@
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
 import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.detection.TaskUtils;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DetectionDataSLAJob implements Job {
-  private static final Logger LOG = LoggerFactory.getLogger(DetectionDataSLAJob.class);
+public class DataQualityPipelineJob implements Job {

Review comment:
       Add some documents on public class.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
##########
@@ -122,9 +122,10 @@ public void run() {
             long endtime = System.currentTimeMillis();
             createDetectionTask(detectionConfig, endtime);
 
-            if (DetectionUtils.isDataAvailabilityCheckEnabled(detectionConfig)) {
-              createDataSLACheckTask(detectionConfig, endtime);
-              LOG.info("Scheduling a task for data availability {} due to the fallback mechanism.", detectionConfigId);
+            if (DetectionUtils.isDataQualityCheckEnabled(detectionConfig)) {

Review comment:
       We need to have a unified naming convention for this alert.
   I think DQ is too broad. Availability is overloaded as it is used for triggering as well.
   Let's discuss.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.spec;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.detection.spec.AbstractSpec;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DataSlaQualityCheckerSpec extends AbstractSpec {
+  private String sla = "1_DAYS";

Review comment:
       Why it is 1 day here instead of 3 days?

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.dataquality.components;
+
+import com.google.common.collect.ArrayListMultimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.thirdeye.anomaly.AnomalyType;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.PresentationOption;
+import org.apache.pinot.thirdeye.detection.dataquality.spec.DataSlaQualityCheckerSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.DetectionResult;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs data sla checks for the window and generates DATA_MISSING anomalies.
+ */
+@Components(title = "Data Sla Quality Checker",
+    type = "DATA_SLA",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Checks if data is missing or not based on the configured sla",
+    presentation = {
+        @PresentationOption(name = "data sla", template = "is ${sla}")
+    },
+    params = {
+        @Param(name = "sla", placeholder = "value")
+    })
+public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityCheckerSpec>, BaselineProvider<DataSlaQualityCheckerSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSlaQualityChecker.class);
+
+  private String sla;
+  private InputDataFetcher dataFetcher;
+  private final String DEFAULT_DATA_SLA = "3_DAYS";
+
+  @Override
+  public DetectionResult runDetection(Interval window, String metricUrn) {
+    return DetectionResult.from(runSLACheck(MetricEntity.fromURN(metricUrn), window));
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    return TimeSeries.empty();
+  }
+
+  @Override
+  public void init(DataSlaQualityCheckerSpec spec, InputDataFetcher dataFetcher) {
+    this.sla = spec.getSla();
+    this.dataFetcher = dataFetcher;
+  }
+
+  /**
+   * Runs the data sla check for the window on the given metric
+   */
+  private List<MergedAnomalyResultDTO> runSLACheck(MetricEntity me, Interval window) {
+    List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+
+    // We want to measure the overall dataset availability (filters can be ignored)
+    long startTime = window.getStart().getMillis();
+    long endTime = window.getEnd().getMillis();
+    MetricSlice metricSlice = MetricSlice.from(me.getId(), startTime, endTime, ArrayListMultimap.<String, String>create());
+    InputData data = this.dataFetcher.fetchData(new InputDataSpec()
+        .withTimeseriesSlices(Collections.singletonList(metricSlice))
+        .withMetricIdsForDataset(Collections.singletonList(me.getId()))
+        .withMetricIds(Collections.singletonList(me.getId())));
+    DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
+
+    try {
+      long datasetLastRefreshTime = datasetConfig.getLastRefreshTime();
+      if (datasetLastRefreshTime <= 0) {
+        // no availability event -> assume we have processed data till the current detection start
+        datasetLastRefreshTime = startTime - 1;
+      }
+
+      MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime);
+      if (isMissingData(datasetLastRefreshTime, startTime)) {
+        // Double check with data source as 2 things are possible.
+        // 1. This dataset/source may not support availability events
+        // 2. The data availability event pipeline has some issue.
+
+        DataFrame dataFrame = data.getTimeseries().get(metricSlice);
+        if (dataFrame == null || dataFrame.isEmpty()) {
+          // no data
+          if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+            anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+          }
+        } else {
+          datasetLastRefreshTime = dataFrame.getDoubles("timestamp").max().longValue();
+          if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) {
+            if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+              slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, endTime);
+              anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+            }
+          }
+        }
+      } else if (isPartialData(datasetLastRefreshTime, endTime, datasetConfig)) {
+        // Optimize for the common case - the common case is that the data availability events are arriving
+        // correctly and we need not re-fetch the data to double check.
+        if (hasMissedSLA(datasetLastRefreshTime, endTime)) {
+          anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+        }
+      }
+    } catch (Exception e) {
+      LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e);
+    }

Review comment:
       Why the logic is so complicated?
   Can we just compare the data last time vs the expected refresh time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org