You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/11/13 13:15:52 UTC

[incubator-pinot] 01/01: [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch planningPhase
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 061fa3ca98f4e691b5069ca5e87ba1ec8f53833c
Author: Sunitha Beeram <sb...@sbeeram-ld2.linkedin.biz>
AuthorDate: Mon Nov 12 22:49:53 2018 -0800

    [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks
---
 .../linkedin/pinot/core/plan/CombinePlanNode.java  | 22 ++++++++++------
 .../pinot/core/plan/CombinePlanNodeTest.java       | 30 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
index 72f04c1..bd03a94 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.Op;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 public class CombinePlanNode implements PlanNode {
   private static final Logger LOGGER = LoggerFactory.getLogger(CombinePlanNode.class);
 
-  private static final int NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN = 10;
+  private static final int MAX_PLAN_TASKS = Math.min(10, (int) (Runtime.getRuntime().availableProcessors() * .5));
   private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000;
 
   private final List<PlanNode> _planNodes;
@@ -67,7 +68,7 @@ public class CombinePlanNode implements PlanNode {
     int numPlanNodes = _planNodes.size();
     List<Operator> operators = new ArrayList<>(numPlanNodes);
 
-    if (numPlanNodes < NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN) {
+    if (numPlanNodes < MAX_PLAN_TASKS) {
       // Small number of plan nodes, run them sequentially
       for (PlanNode planNode : _planNodes) {
         operators.add(planNode.run());
@@ -79,13 +80,17 @@ public class CombinePlanNode implements PlanNode {
       long endTime = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN;
 
       // Submit all jobs
-      Future[] futures = new Future[numPlanNodes];
-      for (int i = 0; i < numPlanNodes; i++) {
+      Future[] futures = new Future[MAX_PLAN_TASKS];
+      for (int i = 0; i < MAX_PLAN_TASKS; i++) {
         final int index = i;
-        futures[i] = _executorService.submit(new TraceCallable<Operator>() {
+        futures[i] = _executorService.submit(new TraceCallable<List<Operator>>() {
           @Override
-          public Operator callJob() throws Exception {
-            return _planNodes.get(index).run();
+          public List<Operator> callJob() throws Exception {
+            List<Operator> operators = new ArrayList<>();
+            for(int count = index; count < numPlanNodes; count = count + MAX_PLAN_TASKS) {
+              operators.add(_planNodes.get(count).run());
+            }
+            return operators;
           }
         });
       }
@@ -93,7 +98,8 @@ public class CombinePlanNode implements PlanNode {
       // Get all results
       try {
         for (Future future : futures) {
-          operators.add((Operator) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+          List<Operator> ops = (List<Operator>) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+          operators.addAll(ops);
         }
       } catch (Exception e) {
         // Future object will throw ExecutionException for execution exception, need to check the cause to determine
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
index b27dc94..b79c825 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
@@ -15,12 +15,14 @@
  */
 package com.linkedin.pinot.core.plan;
 
+import com.linkedin.pinot.common.request.BrokerRequest;
 import com.linkedin.pinot.core.common.Operator;
 import com.linkedin.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import junit.framework.Assert;
 import org.testng.annotations.Test;
 
@@ -28,6 +30,34 @@ import org.testng.annotations.Test;
 public class CombinePlanNodeTest {
   private ExecutorService _executorService = Executors.newFixedThreadPool(10);
 
+  /**
+   * Tests that the tasks are executed as expected in parallel mode.
+   */
+  @Test
+  public void testParallelExecution() {
+    AtomicInteger count = new AtomicInteger(0);
+    int numPlans = 42;
+    List<PlanNode> planNodes = new ArrayList<>();
+    for (int i = 0; i < numPlans; i++) {
+      planNodes.add(new PlanNode() {
+        @Override
+        public Operator run() {
+          count.incrementAndGet();
+          return null;
+        }
+
+        @Override
+        public void showTree(String prefix) {
+        }
+      });
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, new BrokerRequest(), _executorService, 1000,
+            InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    combinePlanNode.run();
+    Assert.assertEquals(numPlans, count.get());
+  }
+
   @Test
   public void testSlowPlanNode() {
     // Warning: this test is slow (take 10 seconds).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org