You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/11/13 13:15:51 UTC

[incubator-pinot] branch planningPhase created (now 061fa3c)

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a change to branch planningPhase
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 061fa3c  [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks

This branch includes the following new commits:

     new 061fa3c  [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch planningPhase
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 061fa3ca98f4e691b5069ca5e87ba1ec8f53833c
Author: Sunitha Beeram <sb...@sbeeram-ld2.linkedin.biz>
AuthorDate: Mon Nov 12 22:49:53 2018 -0800

    [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks
---
 .../linkedin/pinot/core/plan/CombinePlanNode.java  | 22 ++++++++++------
 .../pinot/core/plan/CombinePlanNodeTest.java       | 30 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
index 72f04c1..bd03a94 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.Op;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 public class CombinePlanNode implements PlanNode {
   private static final Logger LOGGER = LoggerFactory.getLogger(CombinePlanNode.class);
 
-  private static final int NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN = 10;
+  private static final int MAX_PLAN_TASKS = Math.min(10, (int) (Runtime.getRuntime().availableProcessors() * .5));
   private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000;
 
   private final List<PlanNode> _planNodes;
@@ -67,7 +68,7 @@ public class CombinePlanNode implements PlanNode {
     int numPlanNodes = _planNodes.size();
     List<Operator> operators = new ArrayList<>(numPlanNodes);
 
-    if (numPlanNodes < NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN) {
+    if (numPlanNodes < MAX_PLAN_TASKS) {
       // Small number of plan nodes, run them sequentially
       for (PlanNode planNode : _planNodes) {
         operators.add(planNode.run());
@@ -79,13 +80,17 @@ public class CombinePlanNode implements PlanNode {
       long endTime = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN;
 
       // Submit all jobs
-      Future[] futures = new Future[numPlanNodes];
-      for (int i = 0; i < numPlanNodes; i++) {
+      Future[] futures = new Future[MAX_PLAN_TASKS];
+      for (int i = 0; i < MAX_PLAN_TASKS; i++) {
         final int index = i;
-        futures[i] = _executorService.submit(new TraceCallable<Operator>() {
+        futures[i] = _executorService.submit(new TraceCallable<List<Operator>>() {
           @Override
-          public Operator callJob() throws Exception {
-            return _planNodes.get(index).run();
+          public List<Operator> callJob() throws Exception {
+            List<Operator> operators = new ArrayList<>();
+            for(int count = index; count < numPlanNodes; count = count + MAX_PLAN_TASKS) {
+              operators.add(_planNodes.get(count).run());
+            }
+            return operators;
           }
         });
       }
@@ -93,7 +98,8 @@ public class CombinePlanNode implements PlanNode {
       // Get all results
       try {
         for (Future future : futures) {
-          operators.add((Operator) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+          List<Operator> ops = (List<Operator>) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+          operators.addAll(ops);
         }
       } catch (Exception e) {
         // Future object will throw ExecutionException for execution exception, need to check the cause to determine
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
index b27dc94..b79c825 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
@@ -15,12 +15,14 @@
  */
 package com.linkedin.pinot.core.plan;
 
+import com.linkedin.pinot.common.request.BrokerRequest;
 import com.linkedin.pinot.core.common.Operator;
 import com.linkedin.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import junit.framework.Assert;
 import org.testng.annotations.Test;
 
@@ -28,6 +30,34 @@ import org.testng.annotations.Test;
 public class CombinePlanNodeTest {
   private ExecutorService _executorService = Executors.newFixedThreadPool(10);
 
+  /**
+   * Tests that the tasks are executed as expected in parallel mode.
+   */
+  @Test
+  public void testParallelExecution() {
+    AtomicInteger count = new AtomicInteger(0);
+    int numPlans = 42;
+    List<PlanNode> planNodes = new ArrayList<>();
+    for (int i = 0; i < numPlans; i++) {
+      planNodes.add(new PlanNode() {
+        @Override
+        public Operator run() {
+          count.incrementAndGet();
+          return null;
+        }
+
+        @Override
+        public void showTree(String prefix) {
+        }
+      });
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, new BrokerRequest(), _executorService, 1000,
+            InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+    combinePlanNode.run();
+    Assert.assertEquals(numPlans, count.get());
+  }
+
   @Test
   public void testSlowPlanNode() {
     // Warning: this test is slow (take 10 seconds).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org