You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2019/01/03 00:59:21 UTC

[incubator-pinot] branch master updated: [TE] task - randomize execution order to increase parallel throughput (#3634)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 24897f6  [TE] task - randomize execution order to increase parallel throughput (#3634)
24897f6 is described below

commit 24897f683e7f28e1626b6989ada5e16eb7bc3cca
Author: Alexander Pucher <ap...@linkedin.com>
AuthorDate: Wed Jan 2 16:59:16 2019 -0800

    [TE] task - randomize execution order to increase parallel throughput (#3634)
---
 .../src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java | 5 +++++
 .../com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java    | 4 ++--
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
index 5b43998..3e81042 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
@@ -21,6 +21,7 @@ import com.linkedin.thirdeye.anomaly.utils.AnomalyUtils;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
@@ -134,6 +135,7 @@ public class TaskDriver {
       List<TaskDTO> anomalyTasks = new ArrayList<>();
       boolean hasFetchError = false;
       try {
+        // randomize fetching head and tail to reduce synchronized patterns across threads (and hosts)
         boolean orderAscending = System.currentTimeMillis() % 2 == 0;
         anomalyTasks = taskDAO
             .findByStatusOrderByCreateTime(TaskStatus.WAITING, driverConfiguration.getTaskFetchSizeCap(),
@@ -147,6 +149,9 @@ public class TaskDriver {
       if (CollectionUtils.isNotEmpty(anomalyTasks)) {
         LOG.info("Thread {} : Found {} tasks in waiting state", Thread.currentThread().getId(), anomalyTasks.size());
 
+        // shuffle candidate tasks to avoid synchronized patterns across threads (and hosts)
+        Collections.shuffle(anomalyTasks);
+
         for (int i = 0; i < anomalyTasks.size() && !shutdown; i++) {
           TaskDTO anomalyTaskSpec = anomalyTasks.get(i);
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index b44db6d..141a330 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -38,10 +38,10 @@ import com.linkedin.thirdeye.datalayer.util.Predicate;
 public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements TaskManager {
 
   private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_ASC =
-      " WHERE status = :status order by startTime asc limit 5";
+      " WHERE status = :status order by startTime asc limit 10";
 
   private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_DESC =
-      " WHERE status = :status order by startTime desc limit 5";
+      " WHERE status = :status order by startTime desc limit 10";
 
   public TaskManagerImpl() {
     super(TaskDTO.class, TaskBean.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org