You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/28 00:55:22 UTC

[pinot] branch master updated: Enhance early terminate for combine operator (#10988)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 04bdb9a397 Enhance early terminate for combine operator (#10988)
04bdb9a397 is described below

commit 04bdb9a39743569aaacfcb07e2cc7eaa08854791
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jun 27 17:55:17 2023 -0700

    Enhance early terminate for combine operator (#10988)
---
 .../core/operator/combine/BaseCombineOperator.java | 17 ++++++++++++--
 .../combine/BaseSingleBlockCombineOperator.java    | 19 ++++------------
 .../operator/combine/GroupByCombineOperator.java   | 26 ++++++----------------
 ...xValueBasedSelectionOrderByCombineOperator.java |  2 +-
 .../streaming/BaseStreamingCombineOperator.java    | 18 +++++----------
 .../streaming/StreamingGroupByCombineOperator.java | 24 ++++++--------------
 6 files changed, 39 insertions(+), 67 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 9390925bc9..b4babb28a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -19,17 +19,21 @@
 package org.apache.pinot.core.operator.combine;
 
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.util.QueryMultiThreadingUtils;
 import org.apache.pinot.core.util.trace.TraceRunnable;
@@ -48,9 +52,10 @@ import org.slf4j.LoggerFactory;
  * detects that the merged results can already satisfy the query, or the query is already errored out or timed out.
  */
 @SuppressWarnings({"rawtypes"})
-public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends BaseOperator<T> {
+public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends BaseOperator<BaseResultsBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class);
 
+  protected final ResultsBlockMerger<T> _resultsBlockMerger;
   protected final List<Operator> _operators;
   protected final int _numOperators;
   protected final QueryContext _queryContext;
@@ -58,11 +63,19 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
   protected final int _numTasks;
   protected final Phaser _phaser;
   protected final Future[] _futures;
+
   // Use an AtomicInteger to track the next operator to execute
   protected final AtomicInteger _nextOperatorId = new AtomicInteger();
+  // Use a BlockingQueue to store the intermediate results blocks
+  protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>();
+  // Use an AtomicReference to track the exception/error during segment processing
+  protected final AtomicReference<Throwable> _processingException = new AtomicReference<>();
+
   protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0);
 
-  protected BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService) {
+  protected BaseCombineOperator(ResultsBlockMerger<T> resultsBlockMerger, List<Operator> operators,
+      QueryContext queryContext, ExecutorService executorService) {
+    _resultsBlockMerger = resultsBlockMerger;
     _operators = operators;
     _numOperators = _operators.size();
     _queryContext = queryContext;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
index 7366a61b69..f3dd847fba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
@@ -19,12 +19,8 @@
 package org.apache.pinot.core.operator.combine;
 
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
@@ -46,20 +42,12 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock>
-    extends BaseCombineOperator<BaseResultsBlock> {
+    extends BaseCombineOperator<T> {
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseSingleBlockCombineOperator.class);
 
-  // Use an AtomicInteger to track the next operator to execute
-  protected final AtomicInteger _nextOperatorId = new AtomicInteger();
-  // Use a BlockingQueue to store the intermediate results blocks
-  protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>();
-  protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0);
-  protected final ResultsBlockMerger<T> _resultsBlockMerger;
-
   protected BaseSingleBlockCombineOperator(ResultsBlockMerger<T> resultsBlockMerger, List<Operator> operators,
       QueryContext queryContext, ExecutorService executorService) {
-    super(operators, queryContext, executorService);
-    _resultsBlockMerger = resultsBlockMerger;
+    super(resultsBlockMerger, operators, queryContext, executorService);
   }
 
   @Override
@@ -92,7 +80,7 @@ public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock>
   @Override
   protected void processSegments() {
     int operatorId;
-    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+    while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
       Operator operator = _operators.get(operatorId);
       T resultsBlock;
       try {
@@ -116,6 +104,7 @@ public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock>
 
   @Override
   protected void onProcessSegmentsException(Throwable t) {
+    _processingException.compareAndSet(null, t);
     _blockingQueue.offer(new ExceptionResultsBlock(t));
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 0ae665c26d..88fe1a702d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -18,18 +18,14 @@
  */
 package org.apache.pinot.core.operator.combine;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
@@ -69,7 +65,6 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group
   private final int _numAggregationFunctions;
   private final int _numGroupByExpressions;
   private final int _numColumns;
-  private final ConcurrentLinkedQueue<ProcessingException> _mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
   // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished
   // _futures (try to interrupt the execution if it already started).
   private final CountDownLatch _operatorLatch;
@@ -129,7 +124,7 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group
   @Override
   protected void processSegments() {
     int operatorId;
-    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+    while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
       Operator operator = _operators.get(operatorId);
       try {
         if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
@@ -155,12 +150,6 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group
           }
         }
 
-        // Merge processing exceptions.
-        List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
-        if (processingExceptionsToMerge != null) {
-          _mergedProcessingExceptions.addAll(processingExceptionsToMerge);
-        }
-
         // Set groups limit reached flag.
         if (resultsBlock.isNumGroupsLimitReached()) {
           _numGroupsLimitReached = true;
@@ -209,7 +198,7 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group
 
   @Override
   public void onProcessSegmentsException(Throwable t) {
-    _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, t));
+    _processingException.compareAndSet(null, t);
   }
 
   @Override
@@ -244,6 +233,11 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group
       return new ExceptionResultsBlock(new TimeoutException(errorMessage));
     }
 
+    Throwable processingException = _processingException.get();
+    if (processingException != null) {
+      return new ExceptionResultsBlock(processingException);
+    }
+
     IndexedTable indexedTable = _indexedTable;
     if (!_queryContext.isServerReturnFinalResult()) {
       indexedTable.finish(false);
@@ -254,12 +248,6 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group
     mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached);
     mergedBlock.setNumResizes(indexedTable.getNumResizes());
     mergedBlock.setResizeTimeMs(indexedTable.getResizeTimeMs());
-
-    // Set the processing exceptions.
-    if (!_mergedProcessingExceptions.isEmpty()) {
-      mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions));
-    }
-
     return mergedBlock;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 666332d5ac..a5e744fc6d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -147,7 +147,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator
     Comparable threadBoundaryValue = null;
 
     int operatorId;
-    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+    while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
       if (operatorId >= _endOperatorId.get()) {
         _blockingQueue.offer(EMPTY_RESULTS_BLOCK);
         continue;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java
index 96680d124b..10c547d780 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java
@@ -19,9 +19,7 @@
 package org.apache.pinot.core.operator.streaming;
 
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.exception.QueryException;
@@ -42,25 +40,18 @@ import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"rawtypes", "unchecked"})
 public abstract class BaseStreamingCombineOperator<T extends BaseResultsBlock>
-    extends BaseCombineOperator<BaseResultsBlock> {
+    extends BaseCombineOperator<T> {
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamingCombineOperator.class);
 
-  /**
-   * Special results block to indicate that this is the last results block for a child operator in the list
-   */
+  // Use a special results block to indicate that this is the last results block for a child operator in the list
   public static final MetadataResultsBlock LAST_RESULTS_BLOCK = new MetadataResultsBlock();
 
-  // Use a BlockingQueue to store the intermediate results blocks
-  protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>();
-  protected final ResultsBlockMerger<T> _resultsBlockMerger;
-
   protected int _numOperatorsFinished;
   protected boolean _querySatisfied;
 
   public BaseStreamingCombineOperator(ResultsBlockMerger<T> resultsBlockMerger, List<Operator> operators,
       QueryContext queryContext, ExecutorService executorService) {
-    super(operators, queryContext, executorService);
-    _resultsBlockMerger = resultsBlockMerger;
+    super(resultsBlockMerger, operators, queryContext, executorService);
   }
 
   /**
@@ -115,7 +106,7 @@ public abstract class BaseStreamingCombineOperator<T extends BaseResultsBlock>
   protected void processSegments() {
     int operatorId;
     Object tracker = createQuerySatisfiedTracker();
-    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+    while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
       Operator<T> operator = _operators.get(operatorId);
       try {
         if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
@@ -152,6 +143,7 @@ public abstract class BaseStreamingCombineOperator<T extends BaseResultsBlock>
 
   @Override
   protected void onProcessSegmentsException(Throwable t) {
+    _processingException.compareAndSet(null, t);
     _blockingQueue.offer(new ExceptionResultsBlock(t));
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
index 8b04d70265..53d0a3820b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
@@ -18,18 +18,15 @@
  */
 package org.apache.pinot.core.operator.streaming;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
@@ -73,7 +70,6 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato
   private final int _numAggregationFunctions;
   private final int _numGroupByExpressions;
   private final int _numColumns;
-  private final ConcurrentLinkedQueue<ProcessingException> _mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
   // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished
   // _futures (try to interrupt the execution if it already started).
   private final CountDownLatch _operatorLatch;
@@ -156,7 +152,7 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato
   @Override
   public void processSegments() {
     int operatorId;
-    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
+    while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
       Operator operator = _operators.get(operatorId);
       try {
         if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
@@ -182,12 +178,6 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato
           }
         }
 
-        // Merge processing exceptions.
-        List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
-        if (processingExceptionsToMerge != null) {
-          _mergedProcessingExceptions.addAll(processingExceptionsToMerge);
-        }
-
         // Set groups limit reached flag.
         if (resultsBlock.isNumGroupsLimitReached()) {
           _numGroupsLimitReached = true;
@@ -248,6 +238,11 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato
       return new ExceptionResultsBlock(new TimeoutException(errorMessage));
     }
 
+    Throwable processingException = _processingException.get();
+    if (processingException != null) {
+      return new ExceptionResultsBlock(processingException);
+    }
+
     IndexedTable indexedTable = _indexedTable;
     if (!_queryContext.isServerReturnFinalResult()) {
       indexedTable.finish(false);
@@ -258,17 +253,12 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato
     mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached);
     mergedBlock.setNumResizes(indexedTable.getNumResizes());
     mergedBlock.setResizeTimeMs(indexedTable.getResizeTimeMs());
-
-    // Set the processing exceptions.
-    if (!_mergedProcessingExceptions.isEmpty()) {
-      mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions));
-    }
     return mergedBlock;
   }
 
   @Override
   public void onProcessSegmentsException(Throwable t) {
-    _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, t));
+    _processingException.compareAndSet(null, t);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org