You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2019/01/03 00:59:21 UTC
[incubator-pinot] branch master updated: [TE] task - randomize
execution order to increase parallel throughput (#3634)
This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 24897f6 [TE] task - randomize execution order to increase parallel throughput (#3634)
24897f6 is described below
commit 24897f683e7f28e1626b6989ada5e16eb7bc3cca
Author: Alexander Pucher <ap...@linkedin.com>
AuthorDate: Wed Jan 2 16:59:16 2019 -0800
[TE] task - randomize execution order to increase parallel throughput (#3634)
---
.../src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java | 5 +++++
.../com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java | 4 ++--
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
index 5b43998..3e81042 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/task/TaskDriver.java
@@ -21,6 +21,7 @@ import com.linkedin.thirdeye.anomaly.utils.AnomalyUtils;
import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@@ -134,6 +135,7 @@ public class TaskDriver {
List<TaskDTO> anomalyTasks = new ArrayList<>();
boolean hasFetchError = false;
try {
+ // randomize fetching head and tail to reduce synchronized patterns across threads (and hosts)
boolean orderAscending = System.currentTimeMillis() % 2 == 0;
anomalyTasks = taskDAO
.findByStatusOrderByCreateTime(TaskStatus.WAITING, driverConfiguration.getTaskFetchSizeCap(),
@@ -147,6 +149,9 @@ public class TaskDriver {
if (CollectionUtils.isNotEmpty(anomalyTasks)) {
LOG.info("Thread {} : Found {} tasks in waiting state", Thread.currentThread().getId(), anomalyTasks.size());
+ // shuffle candidate tasks to avoid synchronized patterns across threads (and hosts)
+ Collections.shuffle(anomalyTasks);
+
for (int i = 0; i < anomalyTasks.size() && !shutdown; i++) {
TaskDTO anomalyTaskSpec = anomalyTasks.get(i);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index b44db6d..141a330 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -38,10 +38,10 @@ import com.linkedin.thirdeye.datalayer.util.Predicate;
public class TaskManagerImpl extends AbstractManagerImpl<TaskDTO> implements TaskManager {
private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_ASC =
- " WHERE status = :status order by startTime asc limit 5";
+ " WHERE status = :status order by startTime asc limit 10";
private static final String FIND_BY_STATUS_ORDER_BY_CREATE_TIME_DESC =
- " WHERE status = :status order by startTime desc limit 5";
+ " WHERE status = :status order by startTime desc limit 10";
public TaskManagerImpl() {
super(TaskDTO.class, TaskBean.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org