You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/11/16 00:43:51 UTC
[incubator-pinot] branch master updated: [PINOT-7328] Reduce lock
contention in physical planning phase by reducing the total number of tasks
(#3470)
This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 285649f [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks (#3470)
285649f is described below
commit 285649fee6893b1ba49ea5af545306b56569821e
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Thu Nov 15 16:43:46 2018 -0800
[PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks (#3470)
* [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks
* [PINOT-7328] Remove unused import
* [PINOT-7328] Address review comments
* [PINOT-7328] Address review comments
---
.../linkedin/pinot/core/plan/CombinePlanNode.java | 28 ++++++++++++-----
.../pinot/core/plan/CombinePlanNodeTest.java | 36 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 8 deletions(-)
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
index 72f04c1..95556f6 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
@@ -36,7 +36,8 @@ import org.slf4j.LoggerFactory;
public class CombinePlanNode implements PlanNode {
private static final Logger LOGGER = LoggerFactory.getLogger(CombinePlanNode.class);
- private static final int NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN = 10;
+ private static final int MAX_PLAN_THREADS = Math.min(10, (int) (Runtime.getRuntime().availableProcessors() * .5));
+ private static final int MIN_TASKS_PER_THREAD = 10;
private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000;
private final List<PlanNode> _planNodes;
@@ -67,7 +68,7 @@ public class CombinePlanNode implements PlanNode {
int numPlanNodes = _planNodes.size();
List<Operator> operators = new ArrayList<>(numPlanNodes);
- if (numPlanNodes < NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN) {
+ if (numPlanNodes <= MIN_TASKS_PER_THREAD) {
// Small number of plan nodes, run them sequentially
for (PlanNode planNode : _planNodes) {
operators.add(planNode.run());
@@ -78,14 +79,24 @@ public class CombinePlanNode implements PlanNode {
// Calculate the time out timestamp
long endTime = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN;
+ int threads = Math.min(numPlanNodes/MIN_TASKS_PER_THREAD + ((numPlanNodes % MIN_TASKS_PER_THREAD == 0) ? 0 : 1), // ceil without using double arithmetic
+ MAX_PLAN_THREADS);
+ int opsPerThread = Math.max(numPlanNodes/threads + ((numPlanNodes % threads == 0) ? 0 : 1), // ceil without using double arithmetic
+ MIN_TASKS_PER_THREAD);
// Submit all jobs
- Future[] futures = new Future[numPlanNodes];
- for (int i = 0; i < numPlanNodes; i++) {
+ Future[] futures = new Future[threads];
+ for (int i = 0; i < threads; i++) {
final int index = i;
- futures[i] = _executorService.submit(new TraceCallable<Operator>() {
+ futures[i] = _executorService.submit(new TraceCallable<List<Operator>>() {
@Override
- public Operator callJob() throws Exception {
- return _planNodes.get(index).run();
+ public List<Operator> callJob() throws Exception {
+ List<Operator> operators = new ArrayList<>();
+ int start = index * opsPerThread;
+ int limit = Math.min(opsPerThread, numPlanNodes - start);
+ for(int count = start; count < start + limit; count++) {
+ operators.add(_planNodes.get(count).run());
+ }
+ return operators;
}
});
}
@@ -93,7 +104,8 @@ public class CombinePlanNode implements PlanNode {
// Get all results
try {
for (Future future : futures) {
- operators.add((Operator) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+ List<Operator> ops = (List<Operator>) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ operators.addAll(ops);
}
} catch (Exception e) {
// Future object will throw ExecutionException for execution exception, need to check the cause to determine
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
index b27dc94..63c2a35 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
@@ -15,12 +15,15 @@
*/
package com.linkedin.pinot.core.plan;
+import com.linkedin.pinot.common.request.BrokerRequest;
import com.linkedin.pinot.core.common.Operator;
import com.linkedin.pinot.core.plan.maker.InstancePlanMakerImplV2;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.testng.annotations.Test;
@@ -28,6 +31,39 @@ import org.testng.annotations.Test;
public class CombinePlanNodeTest {
private ExecutorService _executorService = Executors.newFixedThreadPool(10);
+ /**
+ * Tests that the tasks are executed as expected in parallel mode.
+ */
+ @Test
+ public void testParallelExecution() {
+
+ AtomicInteger count = new AtomicInteger(0);
+
+ Random rand = new Random();
+ for (int i = 0; i < 5; ++i) {
+ count.set(0);
+ int numPlans = rand.nextInt(5000);
+ List<PlanNode> planNodes = new ArrayList<>();
+ for (int index = 0; index < numPlans; index++) {
+ planNodes.add(new PlanNode() {
+ @Override
+ public Operator run() {
+ count.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public void showTree(String prefix) {
+ }
+ });
+ }
+ CombinePlanNode combinePlanNode =
+ new CombinePlanNode(planNodes, new BrokerRequest(), _executorService, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ combinePlanNode.run();
+ Assert.assertEquals(numPlans, count.get());
+ }
+ }
+
@Test
public void testSlowPlanNode() {
// Warning: this test is slow (take 10 seconds).
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org