You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/11/13 13:15:52 UTC
[incubator-pinot] 01/01: [PINOT-7328] Reduce lock contention in
physical planning phase by reducing the total number of tasks
This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch planningPhase
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 061fa3ca98f4e691b5069ca5e87ba1ec8f53833c
Author: Sunitha Beeram <sb...@sbeeram-ld2.linkedin.biz>
AuthorDate: Mon Nov 12 22:49:53 2018 -0800
[PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks
---
.../linkedin/pinot/core/plan/CombinePlanNode.java | 22 ++++++++++------
.../pinot/core/plan/CombinePlanNodeTest.java | 30 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 8 deletions(-)
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
index 72f04c1..bd03a94 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.Op;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class CombinePlanNode implements PlanNode {
private static final Logger LOGGER = LoggerFactory.getLogger(CombinePlanNode.class);
- private static final int NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN = 10;
+ private static final int MAX_PLAN_TASKS = Math.min(10, (int) (Runtime.getRuntime().availableProcessors() * .5));
private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000;
private final List<PlanNode> _planNodes;
@@ -67,7 +68,7 @@ public class CombinePlanNode implements PlanNode {
int numPlanNodes = _planNodes.size();
List<Operator> operators = new ArrayList<>(numPlanNodes);
- if (numPlanNodes < NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN) {
+ if (numPlanNodes < MAX_PLAN_TASKS) {
// Small number of plan nodes, run them sequentially
for (PlanNode planNode : _planNodes) {
operators.add(planNode.run());
@@ -79,13 +80,17 @@ public class CombinePlanNode implements PlanNode {
long endTime = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN;
// Submit all jobs
- Future[] futures = new Future[numPlanNodes];
- for (int i = 0; i < numPlanNodes; i++) {
+ Future[] futures = new Future[MAX_PLAN_TASKS];
+ for (int i = 0; i < MAX_PLAN_TASKS; i++) {
final int index = i;
- futures[i] = _executorService.submit(new TraceCallable<Operator>() {
+ futures[i] = _executorService.submit(new TraceCallable<List<Operator>>() {
@Override
- public Operator callJob() throws Exception {
- return _planNodes.get(index).run();
+ public List<Operator> callJob() throws Exception {
+ List<Operator> operators = new ArrayList<>();
+ for(int count = index; count < numPlanNodes; count = count + MAX_PLAN_TASKS) {
+ operators.add(_planNodes.get(count).run());
+ }
+ return operators;
}
});
}
@@ -93,7 +98,8 @@ public class CombinePlanNode implements PlanNode {
// Get all results
try {
for (Future future : futures) {
- operators.add((Operator) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+ List<Operator> ops = (List<Operator>) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ operators.addAll(ops);
}
} catch (Exception e) {
// Future object will throw ExecutionException for execution exception, need to check the cause to determine
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
index b27dc94..b79c825 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
@@ -15,12 +15,14 @@
*/
package com.linkedin.pinot.core.plan;
+import com.linkedin.pinot.common.request.BrokerRequest;
import com.linkedin.pinot.core.common.Operator;
import com.linkedin.pinot.core.plan.maker.InstancePlanMakerImplV2;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.testng.annotations.Test;
@@ -28,6 +30,34 @@ import org.testng.annotations.Test;
public class CombinePlanNodeTest {
private ExecutorService _executorService = Executors.newFixedThreadPool(10);
+ /**
+ * Tests that the tasks are executed as expected in parallel mode.
+ */
+ @Test
+ public void testParallelExecution() {
+ AtomicInteger count = new AtomicInteger(0);
+ int numPlans = 42;
+ List<PlanNode> planNodes = new ArrayList<>();
+ for (int i = 0; i < numPlans; i++) {
+ planNodes.add(new PlanNode() {
+ @Override
+ public Operator run() {
+ count.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public void showTree(String prefix) {
+ }
+ });
+ }
+ CombinePlanNode combinePlanNode =
+ new CombinePlanNode(planNodes, new BrokerRequest(), _executorService, 1000,
+ InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+ combinePlanNode.run();
+ Assert.assertEquals(numPlans, count.get());
+ }
+
@Test
public void testSlowPlanNode() {
// Warning: this test is slow (take 10 seconds).
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org