You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/16 00:14:49 UTC

[pinot] branch master updated: [multisage] fix set op issue (#10764)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 27eb9601fa [multisage] fix set op issue (#10764)
27eb9601fa is described below

commit 27eb9601fa7491820a7d0e5599de1426af94ffba
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon May 15 17:14:41 2023 -0700

    [multisage] fix set op issue (#10764)
    
    * fix set op issue, it should return 1 block at a consumption of left table
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/query/runtime/operator/SetOperator.java  | 54 ++++++++--------------
 1 file changed, 20 insertions(+), 34 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
index 40667abc3e..0130234d7d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.operator;
 
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -28,7 +27,6 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ExplainPlanRows;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.operator.ExecutionStatistics;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -49,13 +47,10 @@ public abstract class SetOperator extends MultiStageOperator {
   private final MultiStageOperator _leftChildOperator;
   private final MultiStageOperator _rightChildOperator;
 
-  private final List<Object[]> _resultRowBlock;
-
   private final DataSchema _dataSchema;
-  private TransferableBlock _currentLeftBlock;
-  private Iterator<Object[]> _currentLeftIterator;
 
-  private boolean _isRightBlockConsumed;
+  private boolean _isRightSetBuilt;
+  private TransferableBlock _upstreamErrorBlock;
 
   public SetOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
       DataSchema dataSchema) {
@@ -65,7 +60,6 @@ public abstract class SetOperator extends MultiStageOperator {
     _leftChildOperator = getChildOperators().get(0);
     _rightChildOperator = getChildOperators().get(1);
     _rightRowSet = new HashSet<>();
-    _resultRowBlock = new ArrayList<>();
   }
 
   @Override
@@ -96,10 +90,11 @@ public abstract class SetOperator extends MultiStageOperator {
   @Override
   protected TransferableBlock getNextBlock() {
     // A blocking call to construct a set with all the right side rows.
-    if (!_isRightBlockConsumed) {
+    if (!_isRightSetBuilt) {
       constructRightBlockSet();
     }
-    return constructResultBlockSet();
+    TransferableBlock leftBlock = _leftChildOperator.nextBlock();
+    return constructResultBlockSet(leftBlock);
   }
 
   protected void constructRightBlockSet() {
@@ -112,36 +107,27 @@ public abstract class SetOperator extends MultiStageOperator {
       }
       block = _rightChildOperator.nextBlock();
     }
-    _isRightBlockConsumed = true;
+    _isRightSetBuilt = true;
   }
 
-  protected TransferableBlock constructResultBlockSet() {
-    _resultRowBlock.clear();
-    // First time initialization.
-    if (_currentLeftBlock == null) {
-      _currentLeftBlock = _leftChildOperator.nextBlock();
+  protected TransferableBlock constructResultBlockSet(TransferableBlock leftBlock) {
+    List<Object[]> rows = new ArrayList<>();
+    if (_upstreamErrorBlock != null || leftBlock.isErrorBlock()) {
+      _upstreamErrorBlock = leftBlock;
+      return _upstreamErrorBlock;
     }
-    while (!_currentLeftBlock.isEndOfStreamBlock()) {
-      if (_currentLeftBlock.getType() == DataBlock.Type.METADATA) {
-        _currentLeftBlock = _leftChildOperator.nextBlock();
-        continue;
-      }
-      _currentLeftIterator = _currentLeftBlock.getContainer().iterator();
-      while (_currentLeftIterator.hasNext()) {
-        Object[] row = _currentLeftIterator.next();
-        if (handleRowMatched(row)) {
-          _resultRowBlock.add(row);
-          if (_resultRowBlock.size() == SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY) {
-            return new TransferableBlock(_resultRowBlock, _dataSchema, DataBlock.Type.ROW);
-          }
-        }
+    if (leftBlock.isNoOpBlock() || leftBlock.isSuccessfulEndOfStreamBlock()) {
+      if (leftBlock.isSuccessfulEndOfStreamBlock()) {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       }
-      _currentLeftBlock = _leftChildOperator.nextBlock();
+      return leftBlock;
     }
-    if (!_resultRowBlock.isEmpty()) {
-      return new TransferableBlock(_resultRowBlock, _dataSchema, DataBlock.Type.ROW);
+    for (Object[] row : leftBlock.getContainer()) {
+      if (handleRowMatched(row)) {
+        rows.add(row);
+      }
     }
-    return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org