You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/03/18 01:08:53 UTC

[incubator-pinot] branch master updated: Combine operators: remove redundant variables and override logger in subclass (#6690)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new acc24a1  Combine operators: remove redundant variables and override logger in subclass (#6690)
acc24a1 is described below

commit acc24a105f3c2d728ed2589e4838108769e82e2c
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Wed Mar 17 18:08:32 2021 -0700

    Combine operators: remove redundant variables and override logger in subclass (#6690)
    
    - remove redundant variables in subclass
    - override logger in subclass for easy debugging
    - remove unnecessary constructor in BaaseCombineOperator since subclass has access to protected variables in supper class.
---
 .../core/operator/combine/BaseCombineOperator.java   |  9 ++++-----
 .../operator/combine/GroupByCombineOperator.java     |  1 +
 .../combine/GroupByOrderByCombineOperator.java       |  1 +
 ...MaxValueBasedSelectionOrderByCombineOperator.java | 20 ++++++--------------
 .../combine/SelectionOnlyCombineOperator.java        |  3 +++
 .../combine/SelectionOrderByCombineOperator.java     |  3 +++
 6 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 1ed5fc1..aef6cd1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -45,23 +45,22 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings("rawtypes")
 public abstract class BaseCombineOperator extends BaseOperator<IntermediateResultsBlock> {
-  protected static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class);
 
   protected final List<Operator> _operators;
   protected final QueryContext _queryContext;
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
-
   protected final int _numOperators;
-  protected int _numThreads;
   // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
   // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is
   // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined
   // behavior (even JVM crash) when processing queries against it.
   protected final Phaser _phaser = new Phaser(1);
-  protected Future[] _futures;
   // Use a _blockingQueue to store the per-segment result
-  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
+  protected int _numThreads;
+  protected Future[] _futures;
 
   public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index ff40b2e..dadb23a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -79,6 +79,7 @@ public class GroupByCombineOperator extends BaseCombineOperator {
 
   public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
       long endTimeMs, int innerSegmentNumGroupsLimit) {
+    // GroupByCombineOperator use numOperators as numThreads
     super(operators, queryContext, executorService, endTimeMs, operators.size());
     _innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit;
     _interSegmentNumGroupsLimit =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 99ba9df..38f1a54 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -79,6 +79,7 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
 
   public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
       ExecutorService executorService, long endTimeMs, int trimThreshold) {
+    // GroupByOrderByCombineOperator use numOperators as numThreads
     super(operators, queryContext, executorService, endTimeMs, operators.size());
     _initLock = new ReentrantLock();
     _trimSize = GroupByUtils.getTableCapacity(_queryContext);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 901d6bc..29359f6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -22,11 +22,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.PriorityQueue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -40,6 +37,8 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -54,18 +53,14 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombineOperator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MinMaxValueBasedSelectionOrderByCombineOperator.class);
   private static final String OPERATOR_NAME = "MinMaxValueBasedSelectionOrderByCombineOperator";
-
   // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
   // special IntermediateResultsBlock into the BlockingQueue to awake the main thread
   private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
       new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
           Collections.emptyList());
 
-  private final int _numOperators;
-  private final int numThreads;
-  // Use a BlockingQueue to store the per-segment result
-  private final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
   // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
   private final AtomicInteger _numOperatorsSkipped = new AtomicInteger();
   private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>();
@@ -76,9 +71,6 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
       ExecutorService executorService, long endTimeMs, List<MinMaxValueContext> minMaxValueContexts) {
     super(operators, queryContext, executorService, endTimeMs);
     _minMaxValueContexts = minMaxValueContexts;
-    _numOperators = _operators.size();
-    numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
-    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
     _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
   }
 
@@ -118,7 +110,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
       //       segment result is merged.
       Comparable threadBoundaryValue = null;
 
-      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += numThreads) {
+      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) {
         // Calculate the boundary value from global boundary and thread boundary
         Comparable boundaryValue = _globalBoundaryValue.get();
         if (boundaryValue == null) {
@@ -146,7 +138,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
             if (minMaxValueContext._minValue != null) {
               int result = minMaxValueContext._minValue.compareTo(boundaryValue);
               if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
-                _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads);
+                _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads);
                 _blockingQueue.offer(LAST_RESULTS_BLOCK);
                 return;
               }
@@ -157,7 +149,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
             if (minMaxValueContext._maxValue != null) {
               int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
               if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
-                _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / numThreads);
+                _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads);
                 _blockingQueue.offer(LAST_RESULTS_BLOCK);
                 return;
               }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
index 2da4c56..f2d4d06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java
@@ -27,6 +27,8 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -37,6 +39,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
  */
 @SuppressWarnings("rawtypes")
 public class SelectionOnlyCombineOperator extends BaseCombineOperator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOnlyCombineOperator.class);
   private static final String OPERATOR_NAME = "SelectionOnlyCombineOperator";
 
   private final int _numRowsToKeep;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index be899fb..dfd34e9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -32,6 +32,8 @@ import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -43,6 +45,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class SelectionOrderByCombineOperator extends BaseCombineOperator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOrderByCombineOperator.class);
   private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator";
 
   private final List<Operator> _operators;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org