You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/11/16 00:43:51 UTC

[incubator-pinot] branch master updated: [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks (#3470)

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 285649f  [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks (#3470)
285649f is described below

commit 285649fee6893b1ba49ea5af545306b56569821e
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Thu Nov 15 16:43:46 2018 -0800

    [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks (#3470)
    
    * [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks
    
    * [PINOT-7328] Remove unused import
    
    * [PINOT-7328] Address review comments
    
    * [PINOT-7328] Address review comments
---
 .../linkedin/pinot/core/plan/CombinePlanNode.java  | 28 ++++++++++++-----
 .../pinot/core/plan/CombinePlanNodeTest.java       | 36 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 8 deletions(-)

diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
index 72f04c1..95556f6 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java
@@ -36,7 +36,8 @@ import org.slf4j.LoggerFactory;
 public class CombinePlanNode implements PlanNode {
   private static final Logger LOGGER = LoggerFactory.getLogger(CombinePlanNode.class);
 
-  private static final int NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN = 10;
+  private static final int MAX_PLAN_THREADS = Math.min(10, (int) (Runtime.getRuntime().availableProcessors() * .5));
+  private static final int MIN_TASKS_PER_THREAD = 10;
   private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000;
 
   private final List<PlanNode> _planNodes;
@@ -67,7 +68,7 @@ public class CombinePlanNode implements PlanNode {
     int numPlanNodes = _planNodes.size();
     List<Operator> operators = new ArrayList<>(numPlanNodes);
 
-    if (numPlanNodes < NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN) {
+    if (numPlanNodes <= MIN_TASKS_PER_THREAD) {
       // Small number of plan nodes, run them sequentially
       for (PlanNode planNode : _planNodes) {
         operators.add(planNode.run());
@@ -78,14 +79,24 @@ public class CombinePlanNode implements PlanNode {
       // Calculate the time out timestamp
       long endTime = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN;
 
+      int threads = Math.min(numPlanNodes/MIN_TASKS_PER_THREAD + ((numPlanNodes % MIN_TASKS_PER_THREAD == 0) ? 0 : 1), // ceil without using double arithmetic
+          MAX_PLAN_THREADS);
+      int opsPerThread = Math.max(numPlanNodes/threads + ((numPlanNodes % threads == 0) ? 0 : 1), // ceil without using double arithmetic
+          MIN_TASKS_PER_THREAD);
       // Submit all jobs
-      Future[] futures = new Future[numPlanNodes];
-      for (int i = 0; i < numPlanNodes; i++) {
+      Future[] futures = new Future[threads];
+      for (int i = 0; i < threads; i++) {
         final int index = i;
-        futures[i] = _executorService.submit(new TraceCallable<Operator>() {
+        futures[i] = _executorService.submit(new TraceCallable<List<Operator>>() {
           @Override
-          public Operator callJob() throws Exception {
-            return _planNodes.get(index).run();
+          public List<Operator> callJob() throws Exception {
+            List<Operator> operators = new ArrayList<>();
+            int start = index * opsPerThread;
+            int limit = Math.min(opsPerThread, numPlanNodes - start);
+            for(int count = start; count < start + limit; count++) {
+              operators.add(_planNodes.get(count).run());
+            }
+            return operators;
           }
         });
       }
@@ -93,7 +104,8 @@ public class CombinePlanNode implements PlanNode {
       // Get all results
       try {
         for (Future future : futures) {
-          operators.add((Operator) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+          List<Operator> ops = (List<Operator>) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+          operators.addAll(ops);
         }
       } catch (Exception e) {
         // Future object will throw ExecutionException for execution exception, need to check the cause to determine
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
index b27dc94..63c2a35 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java
@@ -15,12 +15,15 @@
  */
 package com.linkedin.pinot.core.plan;
 
+import com.linkedin.pinot.common.request.BrokerRequest;
 import com.linkedin.pinot.core.common.Operator;
 import com.linkedin.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import junit.framework.Assert;
 import org.testng.annotations.Test;
 
@@ -28,6 +31,39 @@ import org.testng.annotations.Test;
 public class CombinePlanNodeTest {
   private ExecutorService _executorService = Executors.newFixedThreadPool(10);
 
+  /**
+   * Tests that the tasks are executed as expected in parallel mode.
+   */
+  @Test
+  public void testParallelExecution() {
+
+    AtomicInteger count = new AtomicInteger(0);
+
+    Random rand = new Random();
+    for (int i = 0; i < 5; ++i) {
+      count.set(0);
+      int numPlans = rand.nextInt(5000);
+      List<PlanNode> planNodes = new ArrayList<>();
+      for (int index = 0; index < numPlans; index++) {
+        planNodes.add(new PlanNode() {
+          @Override
+          public Operator run() {
+            count.incrementAndGet();
+            return null;
+          }
+
+          @Override
+          public void showTree(String prefix) {
+          }
+        });
+      }
+      CombinePlanNode combinePlanNode =
+          new CombinePlanNode(planNodes, new BrokerRequest(), _executorService, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+      combinePlanNode.run();
+      Assert.assertEquals(numPlans, count.get());
+    }
+  }
+
   @Test
   public void testSlowPlanNode() {
     // Warning: this test is slow (take 10 seconds).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org