You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2020/08/13 18:05:01 UTC
[incubator-pinot] branch master updated: [TE] Fix wrong task pickup
logic (#5855)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 09e9804 [TE] Fix wrong task pickup logic (#5855)
09e9804 is described below
commit 09e98049fa68a8c712986cb9ea5fbc700c34ade5
Author: Yanwen(Jason) Lin <34...@users.noreply.github.com>
AuthorDate: Thu Aug 13 14:04:43 2020 -0400
[TE] Fix wrong task pickup logic (#5855)
The logic in the TaskDriver for picking a task is wrong in #5769. This is the PR for fixing it.
---
.../pinot/thirdeye/anomaly/task/TaskDriver.java | 15 ++++++++----
.../pinot/thirdeye/datalayer/bao/TaskManager.java | 2 ++
.../datalayer/bao/jdbc/TaskManagerImpl.java | 27 ++++++++++++++++++++--
3 files changed, 38 insertions(+), 6 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
index bbeb8b9..ca93a77 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
@@ -194,10 +194,17 @@ public class TaskDriver {
boolean orderAscending = System.currentTimeMillis() % 2 == 0;
// find by task type to separate online task from a normal task
- TaskType type = this.isOnline ? TaskType.DETECTION_ONLINE : TaskType.DETECTION;
- anomalyTasks = taskDAO
- .findByStatusAndTypeOrderByCreateTime(TaskStatus.WAITING, type, driverConfiguration.getTaskFetchSizeCap(),
- orderAscending);
+ if (this.isOnline) {
+ anomalyTasks = taskDAO
+ .findByStatusAndTypeOrderByCreateTime(TaskStatus.WAITING,
+ TaskType.DETECTION_ONLINE, driverConfiguration.getTaskFetchSizeCap(),
+ orderAscending);
+ } else {
+ anomalyTasks = taskDAO
+ .findByStatusAndTypeNotInOrderByCreateTime(TaskStatus.WAITING,
+ TaskType.DETECTION_ONLINE, driverConfiguration.getTaskFetchSizeCap(),
+ orderAscending);
+ }
} catch (Exception e) {
hasFetchError = true;
anomalyTasks.clear();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
index 75d8560..56ff05a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
@@ -46,6 +46,8 @@ public interface TaskManager extends AbstractManager<TaskDTO>{
List<TaskDTO> findByStatusAndTypeOrderByCreateTime(TaskStatus status, TaskConstants.TaskType type, int fetchSize, boolean asc);
+ List<TaskDTO> findByStatusAndTypeNotInOrderByCreateTime(TaskStatus status, TaskConstants.TaskType type, int fetchSize, boolean asc);
+
List<TaskDTO> findByStatusAndWorkerId(Long workerId, TaskStatus status);
boolean updateStatusAndWorkerId(Long workerId, Long id, Set<TaskStatus> allowedOldStatus, int expectedVersion);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index 2fce02b..7441219 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -58,9 +58,15 @@ public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements Tas
private static final String FIND_BY_STATUS_AND_TYPE_ORDER_BY_CREATE_TIME_ASC =
" WHERE status = :status and type = :type order by startTime asc limit 10";
- private static final String FIND_BY_STATUS_AND_TYPE__ORDER_BY_CREATE_TIME_DESC =
+ private static final String FIND_BY_STATUS_AND_TYPE_ORDER_BY_CREATE_TIME_DESC =
" WHERE status = :status and type = :type order by startTime desc limit 10";
+ private static final String FIND_BY_STATUS_AND_TYPE_NOT_IN_ORDER_BY_CREATE_TIME_ASC =
+ " WHERE status = :status and type != :type order by startTime asc limit 10";
+
+ private static final String FIND_BY_STATUS_AND_TYPE_NOT_IN_ORDER_BY_CREATE_TIME_DESC =
+ " WHERE status = :status and type != :type order by startTime desc limit 10";
+
private static final String FIND_BY_NAME_ORDER_BY_CREATE_TIME_ASC =
" WHERE name = :name order by createTime asc limit ";
@@ -132,7 +138,24 @@ public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements Tas
parameterMap.put("type", type.toString());
List<TaskBean> list;
String queryClause = (asc) ? FIND_BY_STATUS_AND_TYPE_ORDER_BY_CREATE_TIME_ASC
- : FIND_BY_STATUS_AND_TYPE__ORDER_BY_CREATE_TIME_DESC;
+ : FIND_BY_STATUS_AND_TYPE_ORDER_BY_CREATE_TIME_DESC;
+ list = genericPojoDao.executeParameterizedSQL(queryClause, parameterMap, TaskBean.class);
+ List<TaskDTO> result = new ArrayList<>();
+ for (TaskBean bean : list) {
+ result.add(MODEL_MAPPER.map(bean, TaskDTO.class));
+ }
+ return result;
+ }
+
+ @Override
+ public List<TaskDTO> findByStatusAndTypeNotInOrderByCreateTime(TaskStatus status,
+ TaskConstants.TaskType type, int fetchSize, boolean asc) {
+ Map<String, Object> parameterMap = new HashMap<>();
+ parameterMap.put("status", status.toString());
+ parameterMap.put("type", type.toString());
+ List<TaskBean> list;
+ String queryClause = (asc) ? FIND_BY_STATUS_AND_TYPE_NOT_IN_ORDER_BY_CREATE_TIME_ASC
+ : FIND_BY_STATUS_AND_TYPE_NOT_IN_ORDER_BY_CREATE_TIME_DESC;
list = genericPojoDao.executeParameterizedSQL(queryClause, parameterMap, TaskBean.class);
List<TaskDTO> result = new ArrayList<>();
for (TaskBean bean : list) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org