You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2020/08/13 18:05:01 UTC

[incubator-pinot] branch master updated: [TE] Fix wrong task pickup logic (#5855)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 09e9804  [TE] Fix wrong task pickup logic (#5855)
09e9804 is described below

commit 09e98049fa68a8c712986cb9ea5fbc700c34ade5
Author: Yanwen(Jason) Lin <34...@users.noreply.github.com>
AuthorDate: Thu Aug 13 14:04:43 2020 -0400

    [TE] Fix wrong task pickup logic (#5855)
    
    The logic in the TaskDriver for picking a task is wrong in #5769. This is the PR for fixing it.
---
 .../pinot/thirdeye/anomaly/task/TaskDriver.java    | 15 ++++++++----
 .../pinot/thirdeye/datalayer/bao/TaskManager.java  |  2 ++
 .../datalayer/bao/jdbc/TaskManagerImpl.java        | 27 ++++++++++++++++++++--
 3 files changed, 38 insertions(+), 6 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
index bbeb8b9..ca93a77 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
@@ -194,10 +194,17 @@ public class TaskDriver {
         boolean orderAscending = System.currentTimeMillis() % 2 == 0;
 
         // find by task type to separate online task from a normal task
-        TaskType type = this.isOnline ? TaskType.DETECTION_ONLINE : TaskType.DETECTION;
-        anomalyTasks = taskDAO
-            .findByStatusAndTypeOrderByCreateTime(TaskStatus.WAITING, type, driverConfiguration.getTaskFetchSizeCap(),
-                orderAscending);
+        if (this.isOnline) {
+          anomalyTasks = taskDAO
+              .findByStatusAndTypeOrderByCreateTime(TaskStatus.WAITING,
+                  TaskType.DETECTION_ONLINE, driverConfiguration.getTaskFetchSizeCap(),
+                  orderAscending);
+        } else {
+          anomalyTasks = taskDAO
+              .findByStatusAndTypeNotInOrderByCreateTime(TaskStatus.WAITING,
+                  TaskType.DETECTION_ONLINE, driverConfiguration.getTaskFetchSizeCap(),
+                  orderAscending);
+        }
       } catch (Exception e) {
         hasFetchError = true;
         anomalyTasks.clear();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
index 75d8560..56ff05a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
@@ -46,6 +46,8 @@ public interface TaskManager extends AbstractManager<TaskDTO>{
 
   List<TaskDTO> findByStatusAndTypeOrderByCreateTime(TaskStatus status, TaskConstants.TaskType type, int fetchSize, boolean asc);
 
+  List<TaskDTO> findByStatusAndTypeNotInOrderByCreateTime(TaskStatus status, TaskConstants.TaskType type, int fetchSize, boolean asc);
+
   List<TaskDTO> findByStatusAndWorkerId(Long workerId, TaskStatus status);
 
   boolean updateStatusAndWorkerId(Long workerId, Long id, Set<TaskStatus> allowedOldStatus, int expectedVersion);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index 2fce02b..7441219 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -58,9 +58,15 @@ public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements Tas
   private static final String FIND_BY_STATUS_AND_TYPE_ORDER_BY_CREATE_TIME_ASC =
           " WHERE status = :status and type = :type order by startTime asc limit 10";
 
-  private static final String FIND_BY_STATUS_AND_TYPE__ORDER_BY_CREATE_TIME_DESC =
+  private static final String FIND_BY_STATUS_AND_TYPE_ORDER_BY_CREATE_TIME_DESC =
           " WHERE status = :status and type = :type order by startTime desc limit 10";
 
+  private static final String FIND_BY_STATUS_AND_TYPE_NOT_IN_ORDER_BY_CREATE_TIME_ASC =
+      " WHERE status = :status and type != :type order by startTime asc limit 10";
+
+  private static final String FIND_BY_STATUS_AND_TYPE_NOT_IN_ORDER_BY_CREATE_TIME_DESC =
+      " WHERE status = :status and type != :type order by startTime desc limit 10";
+
   private static final String FIND_BY_NAME_ORDER_BY_CREATE_TIME_ASC =
       " WHERE name = :name order by createTime asc limit ";
 
@@ -132,7 +138,24 @@ public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements Tas
     parameterMap.put("type", type.toString());
     List<TaskBean> list;
     String queryClause = (asc) ? FIND_BY_STATUS_AND_TYPE_ORDER_BY_CREATE_TIME_ASC
-            : FIND_BY_STATUS_AND_TYPE__ORDER_BY_CREATE_TIME_DESC;
+            : FIND_BY_STATUS_AND_TYPE_ORDER_BY_CREATE_TIME_DESC;
+    list = genericPojoDao.executeParameterizedSQL(queryClause, parameterMap, TaskBean.class);
+    List<TaskDTO> result = new ArrayList<>();
+    for (TaskBean bean : list) {
+      result.add(MODEL_MAPPER.map(bean, TaskDTO.class));
+    }
+    return result;
+  }
+
+  @Override
+  public List<TaskDTO> findByStatusAndTypeNotInOrderByCreateTime(TaskStatus status,
+      TaskConstants.TaskType type, int fetchSize, boolean asc) {
+    Map<String, Object> parameterMap = new HashMap<>();
+    parameterMap.put("status", status.toString());
+    parameterMap.put("type", type.toString());
+    List<TaskBean> list;
+    String queryClause = (asc) ? FIND_BY_STATUS_AND_TYPE_NOT_IN_ORDER_BY_CREATE_TIME_ASC
+        : FIND_BY_STATUS_AND_TYPE_NOT_IN_ORDER_BY_CREATE_TIME_DESC;
     list = genericPojoDao.executeParameterizedSQL(queryClause, parameterMap, TaskBean.class);
     List<TaskDTO> result = new ArrayList<>();
     for (TaskBean bean : list) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org