You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:36:42 UTC

[GitHub] [incubator-pinot] jasonyanwenl opened a new pull request #5769: [TE] add anomaly detection as a service - Phase 1

jasonyanwenl opened a new pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769


   This PR creates three endpoints for providing anomaly detection as the REST API service. Endpoints are provided in a new class called `AnomalyDetectionResource`. 
   - The 1st endpoint is `/anomaly-detection/adhoc` which allows users to send requests with ad-hoc data (JSON file), run AD task synchronously, and return anomalies after task completed. A new class called `AdhocThirdEyeDataSource` is created. 
   - The 2nd endpoint is `/anomaly-detection/task` which allows users to send requests to run an AD task via existing configurations asynchronously and will return a task ID. 
   - The 3rd endpoint is `/anomaly-detection/{taskId}` which allows users querying task status.
   
   This is phase 1 for this feature. During phase 2, a new table for ad-hoc data will be created. And logic will be optimized to have shorter querying time for the ad-hoc endpoint.
   
   Testings are added for some methods in `AnomalyDetectionResource`. More testings will be added in phase 2.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467201152



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DatasetConfigValidator.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.validators;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+
+public class DatasetConfigValidator implements ConfigValidator<DatasetConfigDTO> {
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+
+  @Override
+  public void validateConfig(DatasetConfigDTO config) throws IllegalArgumentException {
+    Preconditions.checkArgument(config.getName().startsWith(DEFAULT_DATASET_NAME));

Review comment:
       Yes, you are right! I've removed both this class and metricConfigValidator. The initial purpose of those two classes is to provide a separate class to validate the format of the user input YAML files. Currently, since this endpoint does not support rich customized options, there is not too much to validate so I left them as blank. I deleted those two validators and we could add them back if it's needed in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] vincentchenjl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
vincentchenjl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r465286147



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";

Review comment:
       Could we use the constants in `DetectionConfigValidator`? It is even nicer if you can refactor these constants into a separate class.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";

Review comment:
       If this constant is only used once, you probably don't need to define it on the class level.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DatasetConfigValidator.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.validators;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+
+public class DatasetConfigValidator implements ConfigValidator<DatasetConfigDTO> {

Review comment:
       Can we have more specific name for this class, something like `OnlineDatasetConfigValidator`?

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);

Review comment:
       I agree with Jihao. Having separate CRUD is much easier for users to use the ad-hoc detection. We can set up some cleanup mechanism that clean up any online dataset/metric that is not used for certain period, 14 days for instance. 

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DatasetConfigValidator.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.validators;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+
+public class DatasetConfigValidator implements ConfigValidator<DatasetConfigDTO> {
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+
+  @Override
+  public void validateConfig(DatasetConfigDTO config) throws IllegalArgumentException {
+    Preconditions.checkArgument(config.getName().startsWith(DEFAULT_DATASET_NAME));

Review comment:
       Why do we want to check this prefix? It looks like that this prefix is added by our code and it is guaranteed to happen, right? If we just need to validate the prefix, we should remove this class.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/MetricConfigValidator.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.validators;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+
+public class MetricConfigValidator implements ConfigValidator<MetricConfigDTO> {
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+
+  @Override
+  public void validateConfig(MetricConfigDTO config) throws IllegalArgumentException {
+    Preconditions.checkArgument(config.getName().startsWith(DEFAULT_METRIC_NAME));

Review comment:
       Can we have more specific name for this class, something like `OnlineMetricConfigValidator`? Also, can we do more validation, such as checking the metric column specified actually exists in the dataset?

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);
+
+      // Create & save detection
+      detectionConfigDTO =
+          generateDetectionConfig(payloadNode, nameSuffix, datasetConfigDTO, metricConfigDTO, start,
+              end);
+
+      // Create & save task
+      taskDTO = generateTaskConfig(detectionConfigDTO.getId(), start, end);
+
+      // Polling task status
+      TaskDTO polledTaskDTO = pollingTask(taskDTO.getId());
+
+      // Task failure
+      if (polledTaskDTO.getStatus() != TaskConstants.TaskStatus.COMPLETED) {
+        LOG.warn("Task is not completed after polling: " + polledTaskDTO);
+
+        responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+
+        switch (polledTaskDTO.getStatus()) {
+        case FAILED:
+          responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+          responseMessage.put("message", "Failed to execute anomaly detection task.");

Review comment:
       What about attaching the failure stacktrace here rather than just a generic error message, since the error might help users to debug their configurations.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);
+
+      // Create & save detection
+      detectionConfigDTO =
+          generateDetectionConfig(payloadNode, nameSuffix, datasetConfigDTO, metricConfigDTO, start,
+              end);
+
+      // Create & save task
+      taskDTO = generateTaskConfig(detectionConfigDTO.getId(), start, end);
+
+      // Polling task status
+      TaskDTO polledTaskDTO = pollingTask(taskDTO.getId());
+
+      // Task failure
+      if (polledTaskDTO.getStatus() != TaskConstants.TaskStatus.COMPLETED) {
+        LOG.warn("Task is not completed after polling: " + polledTaskDTO);
+
+        responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+
+        switch (polledTaskDTO.getStatus()) {
+        case FAILED:
+          responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+          responseMessage.put("message", "Failed to execute anomaly detection task.");

Review comment:
       I think that we can put the failure stacktrace into the error message from `taskDTO`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467203515



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";

Review comment:
       Yes, I have refactored part of them. Please check this [commit](https://github.com/apache/incubator-pinot/pull/5769/commits/1591f7c71306870ac868d4ad48de83fce8d28311).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467200428



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);

Review comment:
       I think both ways have some pros and cons. I personally think that using a single endpoint is easier for users?
   * The main purpose of online AD endpoint is to provide a convenient way to run AD tasks so the endpoint should be as simple as possible. Using a single endpoint is more user-friendly. Using a separate CRUD endpoint does allow more flexibility but it will ask users to firstly register their data before using this service.
   * Secondly, online service will not allow much too large size of data so in my sense, sending data in the request every time is not a bottleneck?
   
   I think we could provide both ways. This is phase 1 for this feature. In phase 2, we probably could support another two endpoints to support what you suggested. 
   
   Regarding the cleanup, I think currently in phase 1, it is just a one-call request so I mentioned this as stateless because users will not have a separate endpoint to retrieve anomalies afterwards and hence we could clean up them. In phase 2, another two endpoints will be provided and for those endpoints, we do not need to do the cleanup.
   
   Thanks for your suggestions!
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#issuecomment-670657377


   Hi @suvodeep-pyne, thanks again for your feedback! And sorry for the late response.
   For each of your questions:
   
   > I assume that the online worker must be on when such an API request is issued? What is the behavior if that is not the case?
   
   Yes. The online worker must be on. If that is not the case, the endpoint will timeout during the polling task status stage and then return a failed response back to users.
   
   > I wanted to understand the necessity of having 2 TaskDrivers. How is an online TaskDriver different in behavior than an offline task driver? Same question for online detection task vs offline detection task.
   
   There are some unique states needed to be maintained by the online driver. Here are some main differences:
   * When task driver is acquiring a waiting task, an online task driver should not pick a task created by web app (Similar for the web app driver). Different task drivers will pick different tasks based on the task type (DETECTION or DETECTION_ONLINE).
   * Online task driver will not retrieve detection config from DB. Instead, it retrieves config from task info. Please check this [comment](https://github.com/apache/incubator-pinot/pull/5769#discussion_r463312439) for the reason.
   * Online task driver will have two extra ThirdEye metrics: `duration counter` and `tasks counter`
   
   > Is this about prioritizing online tasks? If yes, have we considered having a priority attached to a task?
   Currently, no priority is attached. But we do want to have that in the future. I guess providing dedicated priority algorithms is another topic and a little bit out of the scope of my current intern project given the limited time so I didn't support that now. But I think we should do that in the future.
   
   I will keep in mind about your suggestions on readability. They are really practical. Thanks!
   
   Thanks again for your feedback!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl edited a comment on pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl edited a comment on pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#issuecomment-670657377


   Hi @suvodeep-pyne, thanks again for your feedback! And sorry for the late response.
   For each of your questions:
   
   > I assume that the online worker must be on when such an API request is issued? What is the behavior if that is not the case?
   
   Yes. The online worker must be on. If that is not the case, the endpoint will timeout during the polling task status stage and then return a failed response back to users.
   
   > I wanted to understand the necessity of having 2 TaskDrivers. How is an online TaskDriver different in behavior than an offline task driver? Same question for online detection task vs offline detection task.
   
   There are some unique states needed to be maintained by the online driver. Here are some main differences:
   * When task driver is acquiring a waiting task, an online task driver should not pick a task created by web app (Similar for the web app driver). Different task drivers will pick different tasks based on the task type (DETECTION or DETECTION_ONLINE).
   * Online task driver will not retrieve detection config from DB. Instead, it retrieves config from task info. Please check this [comment](https://github.com/apache/incubator-pinot/pull/5769#discussion_r463312439) for the reason.
   * Online task driver will have two extra ThirdEye metrics: `duration counter` and `tasks counter`
   
   > Is this about prioritizing online tasks? If yes, have we considered having a priority attached to a task?
   
   Currently, no priority is attached. But we do want to have that in the future. I guess providing dedicated priority algorithms is another topic and a little bit out of the scope of my current intern project given the limited time so I didn't support that now. But I think we should do that in the future.
   
   I will keep in mind about your suggestions on readability. They are really practical. Thanks!
   
   Thanks again for your feedback!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467204150



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";

Review comment:
       I have removed many of those constants and only keep important ones. I think currently it is not necessary to provide a separate constant class since there are much fewer constants? If it is still necessary, please let me know. I will do that! Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jihaozh merged pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jihaozh merged pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r469606604



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
##########
@@ -183,8 +192,11 @@ private TaskDTO acquireTask() {
       try {
         // randomize fetching head and tail to reduce synchronized patterns across threads (and hosts)
         boolean orderAscending = System.currentTimeMillis() % 2 == 0;
+
+        // find by task type to separate online task from a normal task
+        TaskType type = this.isOnline ? TaskType.DETECTION_ONLINE : TaskType.DETECTION;
         anomalyTasks = taskDAO
-            .findByStatusOrderByCreateTime(TaskStatus.WAITING, driverConfiguration.getTaskFetchSizeCap(),
+            .findByStatusAndTypeOrderByCreateTime(TaskStatus.WAITING, type, driverConfiguration.getTaskFetchSizeCap(),

Review comment:
       Thank you so much. Really sorry about this. This is an issue. I will send a PR soon today.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jihaozh commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jihaozh commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r463308961



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time

Review comment:
       Why hard code that one user can only send one request at a time? Would it make more sense to give it more flexibility here? for example, control the overall API QPS, or have a backoff mechanism, or a per-user quota? 

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);
+
+      // Create & save detection
+      detectionConfigDTO =
+          generateDetectionConfig(payloadNode, nameSuffix, datasetConfigDTO, metricConfigDTO, start,
+              end);

Review comment:
       I'm worried about creating a temporary detection config and later cleans it up. These detection configs can get leaked into the UI where users can see and change them while the task is running. 
   
   Since we're already persisting in the task DTO, the information needed to create a detection config can actually just be persisted within the task info. Then an OnlineDetectionTaskRunner can pick it up and create the detectionConfigDTO in run time. No need to persist the DetectionConfig.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);

Review comment:
       Any reason for creating the dataset and metric DTOs be created and later cleaned up? 
   
   IMO a better approach will be to create separate CRUD endpoints for these online datasets/metrics. Doing that will allow the user to create a dataset one time and then run detection multiple times. They can also visualize them in ThirdEye UI if they want. If it's no longer needed, it can be removed. Compared to this way, users need post the data to Thirdeye every time when they run a detection.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);
+
+      // Create & save detection
+      detectionConfigDTO =
+          generateDetectionConfig(payloadNode, nameSuffix, datasetConfigDTO, metricConfigDTO, start,
+              end);
+
+      // Create & save task
+      taskDTO = generateTaskConfig(detectionConfigDTO.getId(), start, end);
+
+      // Polling task status
+      TaskDTO polledTaskDTO = pollingTask(taskDTO.getId());
+
+      // Task failure
+      if (polledTaskDTO.getStatus() != TaskConstants.TaskStatus.COMPLETED) {
+        LOG.warn("Task is not completed after polling: " + polledTaskDTO);
+
+        responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+
+        switch (polledTaskDTO.getStatus()) {
+        case FAILED:
+          responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+          responseMessage.put("message", "Failed to execute anomaly detection task.");
+          break;
+        case TIMEOUT:
+          responseStatus = Response.Status.REQUEST_TIMEOUT;
+          responseMessage.put("message", "Anomaly detection task timeout.");
+        default:
+          LOG.error("Error task status after polling: " + polledTaskDTO.getStatus());
+          responseMessage.put("message", "unknown task status.");
+          break;
+        }
+
+        responseMessage.put("more-info", "Error = " + polledTaskDTO.getMessage());
+
+        // Send response
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Task success
+      // Retrieve task result
+      anomalies = getAnomalies(start, end, metricConfigDTO.getName(), datasetConfigDTO.getName());
+
+      // Build success response
+      JsonNode anomalyNode = objectMapper.convertValue(anomalies, JsonNode.class);
+      JsonNode detectionConfigNode =
+          objectMapper.convertValue(detectionConfigDTO.getYaml(), JsonNode.class);
+      ObjectNode responseNode = objectMapper.createObjectNode();
+      responseNode.set(ANOMALIES_FIELD, anomalyNode);
+      responseNode.set(DETECTION_FIELD, detectionConfigNode);
+
+      responseStatus = Response.Status.OK;
+      return Response.status(responseStatus).entity(objectMapper.writeValueAsString(responseNode))
+          .build();
+    } catch (JsonProcessingException e) {
+      LOG.error("Error: {}", e.getMessage());
+      responseStatus = Response.Status.BAD_REQUEST;
+      responseMessage.put("message", "Invalid request payload");
+      processException(e, responseMessage);
+      return Response.status(responseStatus).entity(responseMessage).build();
+    } catch (Exception e) {
+      LOG.error("Error: {}", e.getMessage());
+      responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+      responseMessage.put("message", "Failed executing anomaly detection service.");
+      processException(e, responseMessage);
+      return Response.status(responseStatus).entity(responseMessage).build();
+    } finally {
+      // Online service is stateless
+      cleanStates(anomalies, taskDTO, metricConfigDTO, datasetConfigDTO, detectionConfigDTO);
+    }
+  }
+
+  void cleanExistingOnlineTask(String nameSuffix) {
+    String metricName = DEFAULT_METRIC_NAME + nameSuffix;
+    List<MetricConfigDTO> metricConfigDTOS = metricConfigDAO.findByMetricName(metricName);
+    for (MetricConfigDTO metricConfigDTO : metricConfigDTOS) {
+      metricConfigDAO.deleteById(metricConfigDTO.getId());
+      LOG.info("Deleted existing metric: {}", metricConfigDTO);
+    }
+
+    String datasetName = DEFAULT_DATASET_NAME + nameSuffix;
+    DatasetConfigDTO datasetConfigDTO = datasetConfigDAO.findByDataset(datasetName);
+    if (datasetConfigDTO != null) {
+      datasetConfigDAO.delete(datasetConfigDTO);
+      LOG.info("Deleted existing dataset: {}", datasetConfigDTO);
+    }
+
+    String detectionName = DEFAULT_DETECTION_NAME + nameSuffix;
+    List<DetectionConfigDTO> detectionConfigDTOS = detectionConfigDAO
+        .findByPredicate(Predicate.EQ(DETECTION_MYSQL_NAME_COLUMN, detectionName));
+    for (DetectionConfigDTO detectionConfigDTO : detectionConfigDTOS) {
+      detectionConfigDAO.delete(detectionConfigDTO);
+      taskDAO.deleteByPredicate(Predicate.EQ(TASK_MYSQL_NAME_COLUMN,
+          TaskConstants.TaskType.DETECTION.name() + "_" + detectionConfigDTO.getId()));
+      LOG.info("Deleted existing task with detection: {}", detectionConfigDTO);
+    }
+  }
+
+  boolean validateOnlineRequestPayload(JsonNode payloadNode) {
+    if (!payloadNode.has(DATA_FIELD))
+      return false;
+
+    JsonNode dataNode = payloadNode.get(DATA_FIELD);
+    if (!dataNode.has(COLUMNS_FIELD) || !dataNode.has(ROWS_FIELD))
+      return false;
+
+    JsonNode columnsNode = dataNode.get(COLUMNS_FIELD);
+    if (!columnsNode.isArray())
+      return false;
+
+    boolean hasTimeColumn = false, hasMetricColumn = false;
+    for (JsonNode columnNode : columnsNode) {
+      if (columnNode.textValue().equals(DEFAULT_TIME_COLUMN))
+        hasTimeColumn = true;
+      if (columnNode.textValue().equals(DEFAULT_METRIC_COLUMN))
+        hasMetricColumn = true;
+      if (hasTimeColumn && hasMetricColumn)
+        break;
+    }
+    return hasTimeColumn && hasMetricColumn;
+  }
+
+  DatasetConfigDTO generateDatasetConfig(JsonNode payloadNode, String suffix) {
+    DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO();
+
+    // Default configuration
+    datasetConfigDTO.setDataset(DEFAULT_DATASET_NAME + suffix);
+    datasetConfigDTO.setDimensions(DEFAULT_DIMENSIONS);
+    datasetConfigDTO.setTimeColumn(DEFAULT_TIME_COLUMN);
+    datasetConfigDTO.setTimeDuration(DEFAULT_TIME_DURATION);
+    datasetConfigDTO.setTimeUnit(DEFAULT_TIME_UNIT);
+    datasetConfigDTO.setTimeFormat(DEFAULT_TIME_FORMAT);
+    datasetConfigDTO.setTimezone(DEFAULT_TIME_ZONE);
+    datasetConfigDTO.setDataSource(ONLINE_DATASOURCE);
+
+    // Customized configuration
+    if (payloadNode.has(DATASET_FIELD)) {
+
+      Map<String, Object> datasetYaml =
+          ConfigUtils.getMap(yaml.load(payloadNode.get(DATASET_FIELD).textValue()));
+
+      if (datasetYaml.containsKey(TIME_COLUMN_YAML_FIELD)) {
+        datasetConfigDTO.setTimeColumn((String) datasetYaml.get(TIME_COLUMN_YAML_FIELD));
+      }
+      if (datasetYaml.containsKey(TIME_UNIT_YAML_FIELD)) {
+        datasetConfigDTO
+            .setTimeUnit(TimeUnit.valueOf((String) datasetYaml.get(TIME_UNIT_YAML_FIELD)));
+      }
+      if (datasetYaml.containsKey(TIME_DURATION_YAML_FIELD)) {
+        datasetConfigDTO.setTimeDuration((Integer) datasetYaml.get(TIME_DURATION_YAML_FIELD));
+      }
+      if (datasetYaml.containsKey(TIME_FORMAT_YAML_FIELD)) {
+        datasetConfigDTO.setTimeFormat((String) datasetYaml.get(TIME_FORMAT_YAML_FIELD));
+      }
+      if (datasetYaml.containsKey(TIME_ZONE_YAML_FIELD)) {
+        datasetConfigDTO.setTimezone((String) datasetYaml.get(TIME_ZONE_YAML_FIELD));
+      }
+    }
+
+    this.datasetConfigValidator.validateConfig(datasetConfigDTO);
+
+    datasetConfigDAO.save(datasetConfigDTO);
+    LOG.info("Created dataset with config {}", datasetConfigDTO);
+
+    return datasetConfigDTO;
+  }
+
+  MetricConfigDTO generateMetricConfig(JsonNode payloadNode, String suffix)
+      throws JsonProcessingException {
+    MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
+    JsonNode dataNode = payloadNode.get(DATA_FIELD);
+
+    // Default configuration
+    metricConfigDTO.setName(DEFAULT_METRIC_NAME + suffix);
+    metricConfigDTO.setDataset(DEFAULT_DATASET_NAME + suffix);
+    metricConfigDTO.setAlias(ThirdEyeUtils
+        .constructMetricAlias(DEFAULT_DATASET_NAME + suffix,
+            DEFAULT_METRIC_NAME + suffix));
+    metricConfigDTO.setDatatype(DEFAULT_DATA_TYPE);
+    metricConfigDTO.setDefaultAggFunction(MetricAggFunction.SUM);
+    metricConfigDTO.setActive(true);
+
+    // Customized configuration
+    if (payloadNode.has(METRIC_FIELD)) {
+      Map<String, Object> metricYaml =
+          ConfigUtils.getMap(yaml.load(payloadNode.get(METRIC_FIELD).textValue()));
+
+      if (metricYaml.containsKey(DATATYPE_YAML_FIELD)) {
+        metricConfigDTO
+            .setDatatype(MetricType.valueOf((String) metricYaml.get(DATATYPE_YAML_FIELD)));
+      }
+    }
+
+    // Reformat Metric column name to keep consistency with metric config
+    ArrayNode columnsNode = dataNode.withArray(COLUMNS_FIELD);
+    if (columnsNode.isArray()) {
+      int colIdx = 0;
+      for (; colIdx < columnsNode.size(); colIdx++) {
+        if (columnsNode.get(colIdx).textValue().equals(DEFAULT_METRIC_COLUMN)) {
+          break;
+        }
+      }
+      columnsNode.set(colIdx, new TextNode(DEFAULT_METRIC_NAME + suffix));
+    }
+    // TODO: should store online data into a new table
+    metricConfigDTO.setOnlineData(this.objectMapper.writeValueAsString(dataNode));
+
+    this.metricConfigValidator.validateConfig(metricConfigDTO);
+
+    metricConfigDAO.save(metricConfigDTO);
+    LOG.info("Created metric with config {}", metricConfigDTO);
+
+    return metricConfigDTO;
+  }
+
+  DetectionConfigDTO generateDetectionConfig(JsonNode payloadNode, String suffix,
+      DatasetConfigDTO datasetConfigDTO, MetricConfigDTO metricConfigDTO, long start, long end) {
+    DetectionConfigDTO detectionConfigDTO;
+    Map<String, Object> detectionYaml;
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+
+    if (payloadNode.has(DETECTION_FIELD)) {
+      // Customized configuration: retrieve config from user request
+      detectionYaml = ConfigUtils.getMap(yaml.load(payloadNode.get(DETECTION_FIELD).textValue()));
+    } else {
+      // Default configuration: retrieve the template from disk
+      detectionYaml =
+          ConfigUtils.getMap(yaml.load(classLoader.getResourceAsStream(TEMPLATE_DETECTION_PATH)));
+    }
+
+    // Do not support customized detection name as it is not a common use case
+    detectionYaml.put(DETECTION_YAML_FIELD, DEFAULT_DETECTION_NAME + suffix);
+    detectionYaml.put(DATASET_YAML_FIELD, datasetConfigDTO.getName());
+    detectionYaml.put(METRIC_YAML_FIELD, metricConfigDTO.getName());
+
+    detectionConfigDTO =
+        new DetectionConfigTranslator(this.yaml.dump(detectionYaml), this.provider).translate();
+    detectionConfigDTO.setCron("0 0 0 1 1 ? 2200"); // Never scheduled
+
+    // Tune the detection config - Passes the raw yaml params & injects tuned params
+    DetectionConfigTuner detectionTuner = new DetectionConfigTuner(detectionConfigDTO, provider);
+    detectionConfigDTO = detectionTuner.tune(start, end);
+
+    // Validate the detection config
+    detectionValidator.validateConfig(detectionConfigDTO);
+
+    detectionConfigDAO.save(detectionConfigDTO);
+    LOG.info("Created detection with config {}", detectionConfigDTO);
+
+    return detectionConfigDTO;
+  }
+
+  TaskDTO generateTaskConfig(long detectionConfigId, long start, long end)
+      throws JsonProcessingException {
+    TaskDTO taskDTO = new TaskDTO();
+    taskDTO.setJobName(TaskConstants.TaskType.DETECTION.toString() + "_" + detectionConfigId);
+    taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
+    taskDTO.setTaskType(TaskConstants.TaskType.DETECTION_ONLINE);
+    DetectionPipelineTaskInfo taskInfo =
+        new DetectionPipelineTaskInfo(detectionConfigId, start, end);
+    String taskInfoJson = objectMapper.writeValueAsString(taskInfo);
+    taskDTO.setTaskInfo(taskInfoJson);
+
+    taskDAO.save(taskDTO);
+    LOG.info("Created task: {}", taskDTO);
+
+    return taskDTO;
+  }
+
+  private TaskDTO pollingTask(long taskId) {
+    long startTime = System.currentTimeMillis();
+    TaskDTO taskDTO;
+
+    // Timeout mechanism will be handled by worker thread in the controller
+    while (true) {
+      taskDTO = taskDAO.findById(taskId);
+
+      LOG.info("Polling task : " + taskDTO);
+
+      TaskConstants.TaskStatus taskStatus = taskDTO.getStatus();
+      if (!taskStatus.equals(TaskConstants.TaskStatus.WAITING) && !taskStatus
+          .equals(TaskConstants.TaskStatus.RUNNING)) {
+        LOG.info("Polling finished ({}ms). Task status: {}", System.currentTimeMillis() - startTime,
+            taskStatus);
+        break;
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(POLLING_SLEEP_TIME);
+      } catch (InterruptedException e) {
+        Log.warn("Interrupted during polling sleep");
+        break;
+      }
+    }
+
+    return taskDTO;
+  }
+
+  private List<AnomalySummary> getAnomalies(long start, long end, String metric, String dataset) {
+    List<AnomalySummary> anomalies =
+        this.userDashboardResource.queryAnomalies(start, end, null, null, metric,
+            dataset, null, false, null);
+
+    LOG.info("Successfully returned " + anomalies.size() + " anomalies.");
+    return anomalies;
+  }
+
+  private void cleanStates(List<AnomalySummary> anomalies, TaskDTO taskDTO,
+      MetricConfigDTO metricConfigDTO, DatasetConfigDTO datasetConfigDTO,
+      DetectionConfigDTO detectionConfigDTO) {
+    if (anomalies != null) {
+      for (AnomalySummary anomaly : anomalies) {
+        anomalyDAO.deleteById(anomaly.getId());
+        LOG.info("Deleted anomaly with id: {}", anomaly.getId());
+      }
+    }
+
+    if (datasetConfigDTO != null) {
+      datasetConfigDAO.delete(datasetConfigDTO);
+      LOG.info("Deleted dataset: {}", datasetConfigDTO);
+    }
+
+    if (metricConfigDTO != null) {
+      metricConfigDAO.delete(metricConfigDTO);
+      LOG.info("Deleted metric: {}", metricConfigDTO);
+    }
+
+    if (detectionConfigDTO != null) {
+      detectionConfigDAO.delete(detectionConfigDTO);
+      LOG.info("Deleted detection: {}", detectionConfigDTO);
+    }
+
+    if (taskDTO != null) {
+      taskDAO.delete(taskDTO);

Review comment:
       no need to delete the taskDTO. it will be cleaned up periodically by the monitor task.

##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);
+
+      // Create & save detection
+      detectionConfigDTO =
+          generateDetectionConfig(payloadNode, nameSuffix, datasetConfigDTO, metricConfigDTO, start,
+              end);
+
+      // Create & save task
+      taskDTO = generateTaskConfig(detectionConfigDTO.getId(), start, end);
+
+      // Polling task status
+      TaskDTO polledTaskDTO = pollingTask(taskDTO.getId());
+
+      // Task failure
+      if (polledTaskDTO.getStatus() != TaskConstants.TaskStatus.COMPLETED) {
+        LOG.warn("Task is not completed after polling: " + polledTaskDTO);
+
+        responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+
+        switch (polledTaskDTO.getStatus()) {
+        case FAILED:
+          responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+          responseMessage.put("message", "Failed to execute anomaly detection task.");
+          break;
+        case TIMEOUT:
+          responseStatus = Response.Status.REQUEST_TIMEOUT;
+          responseMessage.put("message", "Anomaly detection task timeout.");
+        default:
+          LOG.error("Error task status after polling: " + polledTaskDTO.getStatus());
+          responseMessage.put("message", "unknown task status.");
+          break;
+        }
+
+        responseMessage.put("more-info", "Error = " + polledTaskDTO.getMessage());
+
+        // Send response
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Task success
+      // Retrieve task result
+      anomalies = getAnomalies(start, end, metricConfigDTO.getName(), datasetConfigDTO.getName());
+
+      // Build success response
+      JsonNode anomalyNode = objectMapper.convertValue(anomalies, JsonNode.class);
+      JsonNode detectionConfigNode =
+          objectMapper.convertValue(detectionConfigDTO.getYaml(), JsonNode.class);
+      ObjectNode responseNode = objectMapper.createObjectNode();
+      responseNode.set(ANOMALIES_FIELD, anomalyNode);
+      responseNode.set(DETECTION_FIELD, detectionConfigNode);
+
+      responseStatus = Response.Status.OK;
+      return Response.status(responseStatus).entity(objectMapper.writeValueAsString(responseNode))
+          .build();
+    } catch (JsonProcessingException e) {
+      LOG.error("Error: {}", e.getMessage());
+      responseStatus = Response.Status.BAD_REQUEST;
+      responseMessage.put("message", "Invalid request payload");
+      processException(e, responseMessage);
+      return Response.status(responseStatus).entity(responseMessage).build();
+    } catch (Exception e) {
+      LOG.error("Error: {}", e.getMessage());
+      responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+      responseMessage.put("message", "Failed executing anomaly detection service.");
+      processException(e, responseMessage);
+      return Response.status(responseStatus).entity(responseMessage).build();
+    } finally {
+      // Online service is stateless
+      cleanStates(anomalies, taskDTO, metricConfigDTO, datasetConfigDTO, detectionConfigDTO);
+    }
+  }
+
+  void cleanExistingOnlineTask(String nameSuffix) {
+    String metricName = DEFAULT_METRIC_NAME + nameSuffix;
+    List<MetricConfigDTO> metricConfigDTOS = metricConfigDAO.findByMetricName(metricName);
+    for (MetricConfigDTO metricConfigDTO : metricConfigDTOS) {
+      metricConfigDAO.deleteById(metricConfigDTO.getId());
+      LOG.info("Deleted existing metric: {}", metricConfigDTO);
+    }
+
+    String datasetName = DEFAULT_DATASET_NAME + nameSuffix;
+    DatasetConfigDTO datasetConfigDTO = datasetConfigDAO.findByDataset(datasetName);
+    if (datasetConfigDTO != null) {
+      datasetConfigDAO.delete(datasetConfigDTO);
+      LOG.info("Deleted existing dataset: {}", datasetConfigDTO);
+    }
+
+    String detectionName = DEFAULT_DETECTION_NAME + nameSuffix;
+    List<DetectionConfigDTO> detectionConfigDTOS = detectionConfigDAO
+        .findByPredicate(Predicate.EQ(DETECTION_MYSQL_NAME_COLUMN, detectionName));
+    for (DetectionConfigDTO detectionConfigDTO : detectionConfigDTOS) {
+      detectionConfigDAO.delete(detectionConfigDTO);
+      taskDAO.deleteByPredicate(Predicate.EQ(TASK_MYSQL_NAME_COLUMN,
+          TaskConstants.TaskType.DETECTION.name() + "_" + detectionConfigDTO.getId()));
+      LOG.info("Deleted existing task with detection: {}", detectionConfigDTO);
+    }
+  }
+
+  boolean validateOnlineRequestPayload(JsonNode payloadNode) {
+    if (!payloadNode.has(DATA_FIELD))
+      return false;
+
+    JsonNode dataNode = payloadNode.get(DATA_FIELD);
+    if (!dataNode.has(COLUMNS_FIELD) || !dataNode.has(ROWS_FIELD))
+      return false;
+
+    JsonNode columnsNode = dataNode.get(COLUMNS_FIELD);
+    if (!columnsNode.isArray())
+      return false;
+
+    boolean hasTimeColumn = false, hasMetricColumn = false;
+    for (JsonNode columnNode : columnsNode) {
+      if (columnNode.textValue().equals(DEFAULT_TIME_COLUMN))
+        hasTimeColumn = true;
+      if (columnNode.textValue().equals(DEFAULT_METRIC_COLUMN))
+        hasMetricColumn = true;
+      if (hasTimeColumn && hasMetricColumn)
+        break;
+    }
+    return hasTimeColumn && hasMetricColumn;
+  }
+
+  DatasetConfigDTO generateDatasetConfig(JsonNode payloadNode, String suffix) {
+    DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO();
+
+    // Default configuration
+    datasetConfigDTO.setDataset(DEFAULT_DATASET_NAME + suffix);
+    datasetConfigDTO.setDimensions(DEFAULT_DIMENSIONS);
+    datasetConfigDTO.setTimeColumn(DEFAULT_TIME_COLUMN);
+    datasetConfigDTO.setTimeDuration(DEFAULT_TIME_DURATION);
+    datasetConfigDTO.setTimeUnit(DEFAULT_TIME_UNIT);
+    datasetConfigDTO.setTimeFormat(DEFAULT_TIME_FORMAT);
+    datasetConfigDTO.setTimezone(DEFAULT_TIME_ZONE);
+    datasetConfigDTO.setDataSource(ONLINE_DATASOURCE);
+
+    // Customized configuration
+    if (payloadNode.has(DATASET_FIELD)) {
+
+      Map<String, Object> datasetYaml =
+          ConfigUtils.getMap(yaml.load(payloadNode.get(DATASET_FIELD).textValue()));
+
+      if (datasetYaml.containsKey(TIME_COLUMN_YAML_FIELD)) {
+        datasetConfigDTO.setTimeColumn((String) datasetYaml.get(TIME_COLUMN_YAML_FIELD));
+      }
+      if (datasetYaml.containsKey(TIME_UNIT_YAML_FIELD)) {
+        datasetConfigDTO
+            .setTimeUnit(TimeUnit.valueOf((String) datasetYaml.get(TIME_UNIT_YAML_FIELD)));
+      }
+      if (datasetYaml.containsKey(TIME_DURATION_YAML_FIELD)) {
+        datasetConfigDTO.setTimeDuration((Integer) datasetYaml.get(TIME_DURATION_YAML_FIELD));
+      }
+      if (datasetYaml.containsKey(TIME_FORMAT_YAML_FIELD)) {
+        datasetConfigDTO.setTimeFormat((String) datasetYaml.get(TIME_FORMAT_YAML_FIELD));
+      }
+      if (datasetYaml.containsKey(TIME_ZONE_YAML_FIELD)) {
+        datasetConfigDTO.setTimezone((String) datasetYaml.get(TIME_ZONE_YAML_FIELD));
+      }
+    }
+
+    this.datasetConfigValidator.validateConfig(datasetConfigDTO);
+
+    datasetConfigDAO.save(datasetConfigDTO);
+    LOG.info("Created dataset with config {}", datasetConfigDTO);
+
+    return datasetConfigDTO;
+  }
+
+  MetricConfigDTO generateMetricConfig(JsonNode payloadNode, String suffix)
+      throws JsonProcessingException {
+    MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
+    JsonNode dataNode = payloadNode.get(DATA_FIELD);
+
+    // Default configuration
+    metricConfigDTO.setName(DEFAULT_METRIC_NAME + suffix);
+    metricConfigDTO.setDataset(DEFAULT_DATASET_NAME + suffix);
+    metricConfigDTO.setAlias(ThirdEyeUtils
+        .constructMetricAlias(DEFAULT_DATASET_NAME + suffix,
+            DEFAULT_METRIC_NAME + suffix));
+    metricConfigDTO.setDatatype(DEFAULT_DATA_TYPE);
+    metricConfigDTO.setDefaultAggFunction(MetricAggFunction.SUM);
+    metricConfigDTO.setActive(true);
+
+    // Customized configuration
+    if (payloadNode.has(METRIC_FIELD)) {
+      Map<String, Object> metricYaml =
+          ConfigUtils.getMap(yaml.load(payloadNode.get(METRIC_FIELD).textValue()));
+
+      if (metricYaml.containsKey(DATATYPE_YAML_FIELD)) {
+        metricConfigDTO
+            .setDatatype(MetricType.valueOf((String) metricYaml.get(DATATYPE_YAML_FIELD)));
+      }
+    }
+
+    // Reformat Metric column name to keep consistency with metric config
+    ArrayNode columnsNode = dataNode.withArray(COLUMNS_FIELD);
+    if (columnsNode.isArray()) {
+      int colIdx = 0;
+      for (; colIdx < columnsNode.size(); colIdx++) {
+        if (columnsNode.get(colIdx).textValue().equals(DEFAULT_METRIC_COLUMN)) {
+          break;
+        }
+      }
+      columnsNode.set(colIdx, new TextNode(DEFAULT_METRIC_NAME + suffix));
+    }
+    // TODO: should store online data into a new table
+    metricConfigDTO.setOnlineData(this.objectMapper.writeValueAsString(dataNode));
+
+    this.metricConfigValidator.validateConfig(metricConfigDTO);
+
+    metricConfigDAO.save(metricConfigDTO);
+    LOG.info("Created metric with config {}", metricConfigDTO);
+
+    return metricConfigDTO;
+  }
+
+  DetectionConfigDTO generateDetectionConfig(JsonNode payloadNode, String suffix,
+      DatasetConfigDTO datasetConfigDTO, MetricConfigDTO metricConfigDTO, long start, long end) {
+    DetectionConfigDTO detectionConfigDTO;
+    Map<String, Object> detectionYaml;
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+
+    if (payloadNode.has(DETECTION_FIELD)) {
+      // Customized configuration: retrieve config from user request
+      detectionYaml = ConfigUtils.getMap(yaml.load(payloadNode.get(DETECTION_FIELD).textValue()));
+    } else {
+      // Default configuration: retrieve the template from disk
+      detectionYaml =
+          ConfigUtils.getMap(yaml.load(classLoader.getResourceAsStream(TEMPLATE_DETECTION_PATH)));
+    }
+
+    // Do not support customized detection name as it is not a common use case
+    detectionYaml.put(DETECTION_YAML_FIELD, DEFAULT_DETECTION_NAME + suffix);
+    detectionYaml.put(DATASET_YAML_FIELD, datasetConfigDTO.getName());
+    detectionYaml.put(METRIC_YAML_FIELD, metricConfigDTO.getName());
+
+    detectionConfigDTO =
+        new DetectionConfigTranslator(this.yaml.dump(detectionYaml), this.provider).translate();
+    detectionConfigDTO.setCron("0 0 0 1 1 ? 2200"); // Never scheduled
+
+    // Tune the detection config - Passes the raw yaml params & injects tuned params
+    DetectionConfigTuner detectionTuner = new DetectionConfigTuner(detectionConfigDTO, provider);
+    detectionConfigDTO = detectionTuner.tune(start, end);
+
+    // Validate the detection config
+    detectionValidator.validateConfig(detectionConfigDTO);
+
+    detectionConfigDAO.save(detectionConfigDTO);
+    LOG.info("Created detection with config {}", detectionConfigDTO);
+
+    return detectionConfigDTO;
+  }
+
+  TaskDTO generateTaskConfig(long detectionConfigId, long start, long end)
+      throws JsonProcessingException {
+    TaskDTO taskDTO = new TaskDTO();
+    taskDTO.setJobName(TaskConstants.TaskType.DETECTION.toString() + "_" + detectionConfigId);
+    taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
+    taskDTO.setTaskType(TaskConstants.TaskType.DETECTION_ONLINE);
+    DetectionPipelineTaskInfo taskInfo =
+        new DetectionPipelineTaskInfo(detectionConfigId, start, end);
+    String taskInfoJson = objectMapper.writeValueAsString(taskInfo);
+    taskDTO.setTaskInfo(taskInfoJson);
+
+    taskDAO.save(taskDTO);
+    LOG.info("Created task: {}", taskDTO);
+
+    return taskDTO;
+  }
+
+  private TaskDTO pollingTask(long taskId) {
+    long startTime = System.currentTimeMillis();
+    TaskDTO taskDTO;
+
+    // Timeout mechanism will be handled by worker thread in the controller

Review comment:
       I'll still put a timeout mechanism here so that if anything external goes wrong, the thread can still timeout.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jihaozh commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jihaozh commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r469601236



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
##########
@@ -183,8 +192,11 @@ private TaskDTO acquireTask() {
       try {
         // randomize fetching head and tail to reduce synchronized patterns across threads (and hosts)
         boolean orderAscending = System.currentTimeMillis() % 2 == 0;
+
+        // find by task type to separate online task from a normal task
+        TaskType type = this.isOnline ? TaskType.DETECTION_ONLINE : TaskType.DETECTION;
         anomalyTasks = taskDAO
-            .findByStatusOrderByCreateTime(TaskStatus.WAITING, driverConfiguration.getTaskFetchSizeCap(),
+            .findByStatusAndTypeOrderByCreateTime(TaskStatus.WAITING, type, driverConfiguration.getTaskFetchSizeCap(),

Review comment:
       @jasonyanwenl Sorry we missed this one earlier. Will this making it only picking up the Detection task here? This could be a serious issue because it won't be picking up other tasks, for example, sending out the notification, run detection onboarding, etc. Could you double-check?
   
   @vincentchenjl @akshayrai we may need to hold on deployment to prod. 
   @suvodeep-pyne thanks for reporting this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467202997



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/MetricConfigValidator.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.validators;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+
+public class MetricConfigValidator implements ConfigValidator<MetricConfigDTO> {
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+
+  @Override
+  public void validateConfig(MetricConfigDTO config) throws IllegalArgumentException {
+    Preconditions.checkArgument(config.getName().startsWith(DEFAULT_METRIC_NAME));

Review comment:
       This is removed now. Please see this [response](https://github.com/apache/incubator-pinot/pull/5769#discussion_r467201152). 
   In addition, currently, I didn't support customized time/metric column names. But thanks for your suggestion, you just reminded me to do so. Now my new commit has added this functionality. Please check [this](https://github.com/apache/incubator-pinot/pull/5769/commits/4d37ce053af3ada1921cea53fdfe6bb1c32c850d).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] suvodeep-pyne commented on pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
suvodeep-pyne commented on pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#issuecomment-666597454


   Some thoughts:
   - It would be great if this PR can be split into smaller PRs: For example, the OnlineDataSource, TaskDriver changes and Resource changes seem like separate pieces. Correct me if I am wrong. That way it is just easier to review/understand/revert in case of some issue.
   - The `TaskDriver` runs (async) tasks. How does an online task work?
   - Also, is the resource piece directly interacting with the Task? The initial idea was to keep the anomaly detection as a backend piece. Are we calling 'backend' methods directly from Resource classes?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r466671941



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time

Review comment:
       Now an extra suffix of UUID is appended and this allows one user sends multiple requests at a time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jihaozh commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jihaozh commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r469601236



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
##########
@@ -183,8 +192,11 @@ private TaskDTO acquireTask() {
       try {
         // randomize fetching head and tail to reduce synchronized patterns across threads (and hosts)
         boolean orderAscending = System.currentTimeMillis() % 2 == 0;
+
+        // find by task type to separate online task from a normal task
+        TaskType type = this.isOnline ? TaskType.DETECTION_ONLINE : TaskType.DETECTION;
         anomalyTasks = taskDAO
-            .findByStatusOrderByCreateTime(TaskStatus.WAITING, driverConfiguration.getTaskFetchSizeCap(),
+            .findByStatusAndTypeOrderByCreateTime(TaskStatus.WAITING, type, driverConfiguration.getTaskFetchSizeCap(),

Review comment:
       @jasonyanwenl Sorry we missed this one earlier. Will this making it only picking up the Detection task here? This could be a serious issue because it won't be picking up other tasks, for example, sending out the notification, run detection onboarding, etc. Could you double-check?
   
   @vincentchenjl @akshayrai FYI, we may need to hold on deployment to prod. 
   @suvodeep-pyne thanks for reporting this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#issuecomment-672181314


   Thank you so much for the quick review. Really appreciate it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#issuecomment-666769896


   @suvodeep-pyne Thanks for your feedback! For each of your points:
   * I think those pieces of code are related to each other so currently there is no plan to separate them into multiple RPs. In addition, there is a global flag in the configuration file that can turn on/off this feature. The next time I send a PR, I will keep this in mind.
   * The resource will keep polling the task status to make sure this service is synchronous from the user side.
   * The resource will not directly interact with the task and it will not directly call backend method. The resource will only create a task and save it in to DB. The worker thread in the backend will asynchronously retrieve it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467201722



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/validators/DatasetConfigValidator.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.validators;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+
+public class DatasetConfigValidator implements ConfigValidator<DatasetConfigDTO> {

Review comment:
       This is removed now. Please see this [response](https://github.com/apache/incubator-pinot/pull/5769#discussion_r467201152).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] suvodeep-pyne commented on pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
suvodeep-pyne commented on pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#issuecomment-667299037


   Hey @jasonyanwenl 
   
   Thanks for getting back to me. Just wanted to understand some things better:
   - I assume that the online worker must be on when such an API request is issued? What is the behavior if that is not the case?
   - I wanted to understand the necessity of having 2 TaskDrivers. How is an online TaskDriver different in behavior than an offline task driver? Same question for online detection task vs offline detection task.
   - Is this about prioritizing online tasks? If yes, have we considered having a priority attached to a task?
   
   Some comments on readability that have helped me:
   - First of all thank you! I would really encourage having small PRs. It just makes it really easy to review and push commits faster if the changes are small and focussed. Imagine not having the context and reviewing a 1300 line PR. 
   - This can be across multiple files but addressing a single/few related issue(s).
   - Also smaller methods help. same logic: easier to read, refactor, test. And less code duplication.
   - Smaller classes. Starting with an 800 line Resource class makes it bit hard to read, maintain.
   
   Apart this, I really like the utility of these APIs. Thanks for adding this!
   
   Regards


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467136458



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);
+
+      // Create & save detection
+      detectionConfigDTO =
+          generateDetectionConfig(payloadNode, nameSuffix, datasetConfigDTO, metricConfigDTO, start,
+              end);
+
+      // Create & save task
+      taskDTO = generateTaskConfig(detectionConfigDTO.getId(), start, end);
+
+      // Polling task status
+      TaskDTO polledTaskDTO = pollingTask(taskDTO.getId());
+
+      // Task failure
+      if (polledTaskDTO.getStatus() != TaskConstants.TaskStatus.COMPLETED) {
+        LOG.warn("Task is not completed after polling: " + polledTaskDTO);
+
+        responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+
+        switch (polledTaskDTO.getStatus()) {
+        case FAILED:
+          responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+          responseMessage.put("message", "Failed to execute anomaly detection task.");

Review comment:
       Yes. I already put that after this switch statement. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467204816



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);
+
+      // Create & save detection
+      detectionConfigDTO =
+          generateDetectionConfig(payloadNode, nameSuffix, datasetConfigDTO, metricConfigDTO, start,
+              end);

Review comment:
       Thanks for the suggestion! Now the online detection config is saved into task info instead of DB. Please check this [commit](https://github.com/apache/incubator-pinot/pull/5769/commits/3c5f97d2326d5d77b3c13203c1816e6c49da23fa).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467136635



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);
+
+      // Create & save detection
+      detectionConfigDTO =
+          generateDetectionConfig(payloadNode, nameSuffix, datasetConfigDTO, metricConfigDTO, start,
+              end);
+
+      // Create & save task
+      taskDTO = generateTaskConfig(detectionConfigDTO.getId(), start, end);
+
+      // Polling task status
+      TaskDTO polledTaskDTO = pollingTask(taskDTO.getId());
+
+      // Task failure
+      if (polledTaskDTO.getStatus() != TaskConstants.TaskStatus.COMPLETED) {
+        LOG.warn("Task is not completed after polling: " + polledTaskDTO);
+
+        responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+
+        switch (polledTaskDTO.getStatus()) {
+        case FAILED:
+          responseStatus = Response.Status.INTERNAL_SERVER_ERROR;
+          responseMessage.put("message", "Failed to execute anomaly detection task.");

Review comment:
       Yes. Same as above, I already put that after this switch statement. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jasonyanwenl commented on a change in pull request #5769: [TE] add anomaly detection as a service - Phase 1

Posted by GitBox <gi...@apache.org>.
jasonyanwenl commented on a change in pull request #5769:
URL: https://github.com/apache/incubator-pinot/pull/5769#discussion_r467200428



##########
File path: thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/detection/AnomalyDetectionResource.java
##########
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.api.detection;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import io.dropwizard.auth.Auth;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.auth.ThirdEyePrincipal;
+import org.apache.pinot.thirdeye.common.metric.MetricType;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.*;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.*;
+import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
+import org.apache.pinot.thirdeye.detection.validators.DatasetConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import org.apache.pinot.thirdeye.detection.validators.MetricConfigValidator;
+import org.apache.pinot.thirdeye.detection.yaml.DetectionConfigTuner;
+import org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.jfree.util.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Path("/anomaly-detection")
+@Api(tags = { Constants.DETECTION_TAG })
+public class AnomalyDetectionResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectionResource.class);
+
+  private static final String TEMPLATE_DETECTION_PATH = "detection-config-template.yml";
+
+  /* -------- Detection config fields -------- */
+  private static final String DETECTION_YAML_FIELD = "detectionName";
+  private static final String DEFAULT_DETECTION_NAME = "online_detection";
+
+  /* -------- Metric config fields -------- */
+  private static final String DATASET_YAML_FIELD = "dataset";
+  private static final String DEFAULT_DATASET_NAME = "online_dataset";
+  private static final String DATATYPE_YAML_FIELD = "datatype";
+  private static final MetricType DEFAULT_DATA_TYPE = MetricType.DOUBLE;
+
+  /* -------- Dataset config fields -------- */
+  private static final String METRIC_YAML_FIELD = "metric";
+  private static final String DEFAULT_METRIC_NAME = "online_metric";
+  private static final String DEFAULT_METRIC_COLUMN = "metric";
+  private static final String TIME_COLUMN_YAML_FIELD = "timeColumn";
+  private static final String DEFAULT_TIME_COLUMN = "date";
+  private static final String TIME_UNIT_YAML_FIELD = "timeUnit";
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.DAYS;
+  private static final String TIME_DURATION_YAML_FIELD = "timeDuration";
+  private static final String TIME_FORMAT_YAML_FIELD = "timeFormat";
+  private static final String DEFAULT_TIME_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String TIME_ZONE_YAML_FIELD = "timezone";
+  private static final String DEFAULT_TIME_ZONE = "US/Pacific";
+  private static final List<String> DEFAULT_DIMENSIONS =
+      Collections.unmodifiableList(new ArrayList<>());
+
+  /* -------- Request/Response field -------- */
+  private static final String DATA_FIELD = "data";
+  private static final String COLUMNS_FIELD = "columns";
+  private static final String ROWS_FIELD = "rows";
+  private static final String DATASET_FIELD = "datasetConfiguration";
+  private static final String METRIC_FIELD = "metricConfiguration";
+  private static final String DETECTION_FIELD = "detectionConfiguration";
+  private static final String ANOMALIES_FIELD = "anomalies";
+
+  /* -------- Others -------- */
+  private static final String ONLINE_DATASOURCE = "OnlineThirdEyeDataSource";
+  private static final String DETECTION_MYSQL_NAME_COLUMN = "name";
+  private static final String TASK_MYSQL_NAME_COLUMN = "name";
+  private static final String ANOMALY_ENDPOINT_URL = "/userdashboard/anomalies";
+  private static final long POLLING_SLEEP_TIME = 5L;
+  private static final int DEFAULT_TIME_DURATION = 1;
+  private static final long MAX_ONLINE_PAYLOAD_SIZE = 10 * 1024 * 1024L;
+
+  private final UserDashboardResource userDashboardResource;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DataProvider provider;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfigDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final EvaluationManager evaluationDAO;
+  private final TaskManager taskDAO;
+  private final DetectionPipelineLoader loader;
+  private final DetectionConfigValidator detectionValidator;
+  private final DatasetConfigValidator datasetConfigValidator;
+  private final MetricConfigValidator metricConfigValidator;
+  private final ObjectMapper objectMapper = new ObjectMapper();
+  private final Yaml yaml;
+
+  public AnomalyDetectionResource(UserDashboardResource userDashboardResource) {
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.metricConfigDAO = DAORegistry.getInstance().getMetricConfigDAO();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
+    this.userDashboardResource = userDashboardResource;
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfigDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(metricConfigDAO, datasetConfigDAO, eventDAO, anomalyDAO,
+        evaluationDAO, timeseriesLoader, aggregationLoader, loader,
+        TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
+    this.detectionValidator = new DetectionConfigValidator(this.provider);
+    this.metricConfigValidator = new MetricConfigValidator();
+    this.datasetConfigValidator = new DatasetConfigValidator();
+
+    // Read template from disk
+    this.yaml = new Yaml();
+  }
+
+  /**
+   * Run an online anomaly detection service synchronously. It will run anomaly detection using
+   * default configs for detection, metric, dataset
+   *
+   * @param start     detection window start time
+   * @param end       detection window end time
+   * @param payload   payload in request including online data
+   * @param principal user who sent this request. It's used to separate different config names
+   * @return a message containing the detected anomalies and the detection config used
+   */
+  @POST
+  @Path("/")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Request an anomaly detection online task")
+  public Response onlineApi(
+          @QueryParam("start") long start,
+          @QueryParam("end") long end,
+          @ApiParam("jsonPayload") String payload,
+          @Auth ThirdEyePrincipal principal) {
+    DatasetConfigDTO datasetConfigDTO = null;
+    MetricConfigDTO metricConfigDTO = null;
+    DetectionConfigDTO detectionConfigDTO = null;
+    TaskDTO taskDTO = null;
+    List<AnomalySummary> anomalies = null;
+    Response.Status responseStatus;
+    Map<String, String> responseMessage = new HashMap<>();
+    ObjectMapper objectMapper = new ObjectMapper();
+    // Use username to separate different requests. One user can only send one request at a time
+    String nameSuffix = "_" + principal.getName();
+
+    try {
+      if (payload.getBytes().length > MAX_ONLINE_PAYLOAD_SIZE) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Payload too large");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      JsonNode payloadNode = objectMapper.readTree(payload);
+
+      if (!validateOnlineRequestPayload(payloadNode)) {
+        responseStatus = Response.Status.BAD_REQUEST;
+        responseMessage.put("message", "Invalid request payload");
+        return Response.status(responseStatus).entity(responseMessage).build();
+      }
+
+      // Preprocess: remove existing entities generated by the previous interrupted request
+      cleanExistingOnlineTask(nameSuffix);
+
+      // Create & save dataset
+      datasetConfigDTO = generateDatasetConfig(payloadNode, nameSuffix);
+
+      // Create & save metric along with online data
+      metricConfigDTO = generateMetricConfig(payloadNode, nameSuffix);

Review comment:
       I think both ways have some pros and cons. I personally think that using a single endpoint is easier for users?
   * The main purpose of online AD endpoint is to provide a convenient way to run AD tasks so the endpoint should be as simple as possible. Using a single endpoint is more user-friendly. Using a separate CRUD endpoint does allow more flexibility but it will ask users to firstly register their data before using this service.
   * Secondly, online service will not allow much too large size of data so in my sense, sending data in the request every time is not a bottleneck?
   
   In addition, I think we could provide both ways. This is phase 1 for this feature. In phase 2, we probably could support another two endpoints to support what you suggested. 
   
   Or we could wait for the users' responses. If there are many requests asking for separate endpoints, we should definitely do that.
   
   Thanks for your suggestions!
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org