You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/09 02:54:57 UTC

(pinot) branch master updated: [multistage][bugfix] fix operator eos pull (#11970)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 73d82ec666 [multistage][bugfix] fix operator eos pull (#11970)
73d82ec666 is described below

commit 73d82ec6660dd5195bf4b2f07c06fbd1a9c5e96c
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed Nov 8 18:54:50 2023 -0800

    [multistage][bugfix] fix operator eos pull (#11970)
    
    * clean up single input stop-the-world operator
    * clean up 2-input stop-the-world on right, stream on left operator
    * clean up single-input stream operator
    * refactor try-catch block into base class; and fix set op right-side error handling
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../query/runtime/operator/AggregateOperator.java  |  27 ++----
 .../query/runtime/operator/HashJoinOperator.java   |  31 +++---
 .../LeafStageTransferableBlockOperator.java        |  37 ++++---
 .../query/runtime/operator/MultiStageOperator.java |  16 ++-
 .../query/runtime/operator/OperatorStats.java      |   2 -
 .../pinot/query/runtime/operator/SetOperator.java  |  18 +++-
 .../pinot/query/runtime/operator/SortOperator.java | 107 +++++++++------------
 .../query/runtime/operator/TransformOperator.java  |  24 +----
 .../runtime/operator/WindowAggregateOperator.java  |  21 ++--
 9 files changed, 127 insertions(+), 156 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 5cb825c149..39577ba408 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -70,7 +70,7 @@ public class AggregateOperator extends MultiStageOperator {
   private final MultistageAggregationExecutor _aggregationExecutor;
   private final MultistageGroupByExecutor _groupByExecutor;
 
-  private boolean _hasReturnedAggregateBlock;
+  private boolean _hasConstructedAggregateBlock;
 
   public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator, DataSchema resultSchema,
       List<RexExpression> aggCalls, List<RexExpression> groupSet, AggType aggType, List<Integer> filterArgIndices,
@@ -131,26 +131,19 @@ public class AggregateOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    try {
-      TransferableBlock finalBlock = _aggregationExecutor != null ? consumeAggregation() : consumeGroupBy();
-
-      // setting upstream error block
-      if (finalBlock.isErrorBlock()) {
-        return finalBlock;
-      }
-
-      if (!_hasReturnedAggregateBlock) {
-        return produceAggregatedBlock();
-      } else {
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-      }
-    } catch (Exception e) {
-      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    if (_hasConstructedAggregateBlock) {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    }
+    TransferableBlock finalBlock = _aggregationExecutor != null ? consumeAggregation() : consumeGroupBy();
+    // returning upstream error block if finalBlock contains error.
+    if (finalBlock.isErrorBlock()) {
+      return finalBlock;
     }
+    return produceAggregatedBlock();
   }
 
   private TransferableBlock produceAggregatedBlock() {
-    _hasReturnedAggregateBlock = true;
+    _hasConstructedAggregateBlock = true;
     if (_aggregationExecutor != null) {
       return new TransferableBlock(_aggregationExecutor.getResult(), _resultSchema, DataBlock.Type.ROW);
     } else {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index c508b373c5..96645c8882 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -188,24 +188,21 @@ public class HashJoinOperator extends MultiStageOperator {
   }
 
   @Override
-  protected TransferableBlock getNextBlock() {
-    try {
-      if (_isTerminated) {
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-      }
-      if (!_isHashTableBuilt) {
-        // Build JOIN hash table
-        buildBroadcastHashTable();
-      }
-      if (_upstreamErrorBlock != null) {
-        return _upstreamErrorBlock;
-      }
-      TransferableBlock leftBlock = _leftTableOperator.nextBlock();
-      // JOIN each left block with the right block.
-      return buildJoinedDataBlock(leftBlock);
-    } catch (Exception e) {
-      return TransferableBlockUtils.getErrorTransferableBlock(e);
+  protected TransferableBlock getNextBlock()
+      throws ProcessingException {
+    if (_isTerminated) {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    }
+    if (!_isHashTableBuilt) {
+      // Build JOIN hash table
+      buildBroadcastHashTable();
+    }
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
     }
+    TransferableBlock leftBlock = _leftTableOperator.nextBlock();
+    // JOIN each left block with the constructed right hash table.
+    return buildJoinedDataBlock(leftBlock);
   }
 
   private void buildBroadcastHashTable()
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 1a109358ae..ea5a47df98 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -112,29 +112,26 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
   }
 
   @Override
-  protected TransferableBlock getNextBlock() {
+  protected TransferableBlock getNextBlock()
+      throws InterruptedException, TimeoutException {
     if (_executionFuture == null) {
       _executionFuture = startExecution();
     }
-    try {
-      BaseResultsBlock resultsBlock =
-          _blockingQueue.poll(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-      if (resultsBlock == null) {
-        throw new TimeoutException("Timed out waiting for results block");
-      }
-      // Terminate when receiving exception block
-      Map<Integer, String> exceptions = _exceptions;
-      if (exceptions != null) {
-        return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
-      }
-      if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
-        return constructMetadataBlock();
-      } else {
-        // Regular data block
-        return composeTransferableBlock(resultsBlock, _dataSchema);
-      }
-    } catch (Exception e) {
-      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    BaseResultsBlock resultsBlock =
+        _blockingQueue.poll(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    if (resultsBlock == null) {
+      throw new TimeoutException("Timed out waiting for results block");
+    }
+    // Terminate when receiving exception block
+    Map<Integer, String> exceptions = _exceptions;
+    if (exceptions != null) {
+      return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
+    }
+    if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
+      return constructMetadataBlock();
+    } else {
+      // Regular data block
+      return composeTransferableBlock(resultsBlock, _dataSchema);
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index ade326ea20..581e2f8f10 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Joiner;
 import java.util.List;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.trace.InvocationScope;
@@ -55,11 +56,19 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
       if (shouldCollectStats()) {
         OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
         operatorStats.startTimer();
-        nextBlock = getNextBlock();
+        try {
+          nextBlock = getNextBlock();
+        } catch (Exception e) {
+          nextBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+        }
         operatorStats.recordRow(1, nextBlock.getNumRows());
         operatorStats.endTimer(nextBlock);
       } else {
-        nextBlock = getNextBlock();
+        try {
+          nextBlock = getNextBlock();
+        } catch (Exception e) {
+          nextBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+        }
       }
       return nextBlock;
     }
@@ -70,7 +79,8 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
   }
 
   // Make it protected because we should always call nextBlock()
-  protected abstract TransferableBlock getNextBlock();
+  protected abstract TransferableBlock getNextBlock()
+      throws Exception;
 
   protected void earlyTerminate() {
     _isEarlyTerminated = true;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 1aacb03290..32fa9f140f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -43,7 +43,6 @@ public class OperatorStats {
   private long _startTimeMs = -1;
   private long _endTimeMs = -1;
   private final Map<String, String> _executionStats;
-  private boolean _processingStarted = false;
 
   public OperatorStats(OpChainExecutionContext context) {
     this(context.getRequestId(), context.getStageId(), context.getServer());
@@ -69,7 +68,6 @@ public class OperatorStats {
       _executeStopwatch.stop();
       _endTimeMs = System.currentTimeMillis();
     }
-    _processingStarted = true;
   }
 
   public void recordRow(int numBlock, int numRows) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
index 2c45382672..edf8416f02 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -50,6 +50,7 @@ public abstract class SetOperator extends MultiStageOperator {
   private final DataSchema _dataSchema;
 
   private boolean _isRightSetBuilt;
+  private boolean _isTerminated;
   private TransferableBlock _upstreamErrorBlock;
 
   public SetOperator(OpChainExecutionContext opChainExecutionContext, List<MultiStageOperator> upstreamOperators,
@@ -60,6 +61,8 @@ public abstract class SetOperator extends MultiStageOperator {
     _leftChildOperator = getChildOperators().get(0);
     _rightChildOperator = getChildOperators().get(1);
     _rightRowSet = new HashSet<>();
+    _isRightSetBuilt = false;
+    _isTerminated = false;
   }
 
   @Override
@@ -89,10 +92,17 @@ public abstract class SetOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    // A blocking call to construct a set with all the right side rows.
+    if (_isTerminated) {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    }
     if (!_isRightSetBuilt) {
+      // construct a SET with all the right side rows.
       constructRightBlockSet();
     }
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
+    }
+    // UNION each left block with the constructed right block set.
     TransferableBlock leftBlock = _leftChildOperator.nextBlock();
     return constructResultBlockSet(leftBlock);
   }
@@ -107,7 +117,11 @@ public abstract class SetOperator extends MultiStageOperator {
       }
       block = _rightChildOperator.nextBlock();
     }
-    _isRightSetBuilt = true;
+    if (block.isErrorBlock()) {
+      _upstreamErrorBlock = block;
+    } else {
+      _isRightSetBuilt = true;
+    }
   }
 
   protected TransferableBlock constructResultBlockSet(TransferableBlock leftBlock) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 64f0926a63..ce4ddf130f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -51,8 +51,7 @@ public class SortOperator extends MultiStageOperator {
   private final ArrayList<Object[]> _rows;
   private final int _numRowsToKeep;
 
-  private boolean _hasReturnedSortedBlock;
-  private TransferableBlock _upstreamErrorBlock;
+  private boolean _hasConstructedSortedBlock;
 
   public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
       List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
@@ -73,8 +72,7 @@ public class SortOperator extends MultiStageOperator {
     _fetch = fetch;
     _offset = Math.max(offset, 0);
     _dataSchema = dataSchema;
-    _upstreamErrorBlock = null;
-    _hasReturnedSortedBlock = false;
+    _hasConstructedSortedBlock = false;
     // Setting numRowsToKeep as default maximum on Broker if limit not set.
     // TODO: make this default behavior configurable.
     _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultResponseLimit;
@@ -110,74 +108,65 @@ public class SortOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    try {
-      consumeInputBlocks();
-      return produceSortedBlock();
-    } catch (Exception e) {
-      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    if (_hasConstructedSortedBlock) {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    }
+    TransferableBlock finalBlock = consumeInputBlocks();
+    // returning upstream error block if finalBlock contains error.
+    if (finalBlock.isErrorBlock()) {
+      return finalBlock;
     }
+    return produceSortedBlock();
   }
 
   private TransferableBlock produceSortedBlock() {
-    if (_upstreamErrorBlock != null) {
-      return _upstreamErrorBlock;
-    }
-
-    if (!_hasReturnedSortedBlock) {
-      _hasReturnedSortedBlock = true;
-      if (_priorityQueue == null) {
-        if (_rows.size() > _offset) {
-          List<Object[]> row = _rows.subList(_offset, _rows.size());
-          return new TransferableBlock(row, _dataSchema, DataBlock.Type.ROW);
-        } else {
-          return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-        }
+    _hasConstructedSortedBlock = true;
+    if (_priorityQueue == null) {
+      if (_rows.size() > _offset) {
+        List<Object[]> row = _rows.subList(_offset, _rows.size());
+        return new TransferableBlock(row, _dataSchema, DataBlock.Type.ROW);
       } else {
-        LinkedList<Object[]> rows = new LinkedList<>();
-        while (_priorityQueue.size() > _offset) {
-          Object[] row = _priorityQueue.poll();
-          rows.addFirst(row);
-        }
-        if (rows.size() == 0) {
-          return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-        } else {
-          return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
-        }
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       }
     } else {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      LinkedList<Object[]> rows = new LinkedList<>();
+      while (_priorityQueue.size() > _offset) {
+        Object[] row = _priorityQueue.poll();
+        rows.addFirst(row);
+      }
+      if (rows.size() == 0) {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      } else {
+        return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
+      }
     }
   }
 
-  private void consumeInputBlocks() {
-    if (!_hasReturnedSortedBlock) {
-      TransferableBlock block = _upstreamOperator.nextBlock();
-      while (block.isDataBlock()) {
-        List<Object[]> container = block.getContainer();
-        if (_priorityQueue == null) {
-          // TODO: when push-down properly, we shouldn't get more than _numRowsToKeep
-          int numRows = _rows.size();
-          if (numRows < _numRowsToKeep) {
-            if (numRows + container.size() < _numRowsToKeep) {
-              _rows.addAll(container);
-            } else {
-              _rows.addAll(container.subList(0, _numRowsToKeep - numRows));
-              LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId,
-                  _context.getId());
-              // setting operator to be early terminated and awaits EOS block next.
-              earlyTerminate();
-            }
-          }
-        } else {
-          for (Object[] row : container) {
-            SelectionOperatorUtils.addToPriorityQueue(row, _priorityQueue, _numRowsToKeep);
+  private TransferableBlock consumeInputBlocks() {
+    TransferableBlock block = _upstreamOperator.nextBlock();
+    while (block.isDataBlock()) {
+      List<Object[]> container = block.getContainer();
+      if (_priorityQueue == null) {
+        // TODO: when push-down properly, we shouldn't get more than _numRowsToKeep
+        int numRows = _rows.size();
+        if (numRows < _numRowsToKeep) {
+          if (numRows + container.size() < _numRowsToKeep) {
+            _rows.addAll(container);
+          } else {
+            _rows.addAll(container.subList(0, _numRowsToKeep - numRows));
+            LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId,
+                _context.getId());
+            // setting operator to be early terminated and awaits EOS block next.
+            earlyTerminate();
           }
         }
-        block = _upstreamOperator.nextBlock();
-      }
-      if (block.isErrorBlock()) {
-        _upstreamErrorBlock = block;
+      } else {
+        for (Object[] row : container) {
+          SelectionOperatorUtils.addToPriorityQueue(row, _priorityQueue, _numRowsToKeep);
+        }
       }
+      block = _upstreamOperator.nextBlock();
     }
+    return block;
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index f769ceef5a..234f32fbcd 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperandFactory;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -51,7 +50,6 @@ public class TransformOperator extends MultiStageOperator {
   private final int _resultColumnSize;
   // TODO: Check type matching between resultSchema and the actual result.
   private final DataSchema _resultSchema;
-  private TransferableBlock _upstreamErrorBlock;
 
   public TransformOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
       DataSchema resultSchema, List<RexExpression> transforms, DataSchema upstreamDataSchema) {
@@ -81,28 +79,10 @@ public class TransformOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    try {
-      TransferableBlock block = _upstreamOperator.nextBlock();
-      return transform(block);
-    } catch (RuntimeException e) {
-      return TransferableBlockUtils.getErrorTransferableBlock(e);
-    }
-  }
-
-  private TransferableBlock transform(TransferableBlock block) {
-    // TODO: Other operators keep the first erroneous block, while this keep the last.
-    //  We should decide what is what we want to do and be consistent with that.
-    if (block.isErrorBlock()) {
-      _upstreamErrorBlock = block;
-    }
-    if (_upstreamErrorBlock != null) {
-      return _upstreamErrorBlock;
-    }
-
-    if (TransferableBlockUtils.isEndOfStream(block)) {
+    TransferableBlock block = _upstreamOperator.nextBlock();
+    if (block.isEndOfStreamBlock()) {
       return block;
     }
-
     List<Object[]> container = block.getContainer();
     List<Object[]> resultRows = new ArrayList<>(container.size());
     for (Object[] row : container) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index ea9e065416..259abaea1b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -163,21 +163,14 @@ public class WindowAggregateOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    try {
-      TransferableBlock finalBlock = consumeInputBlocks();
-      if (finalBlock.isErrorBlock()) {
-        return finalBlock;
-      }
-
-      if (!_hasReturnedWindowAggregateBlock) {
-        return produceWindowAggregatedBlock();
-      } else {
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
-      }
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while executing WindowAggregationOperator, returning an error block", e);
-      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    if (_hasReturnedWindowAggregateBlock) {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    }
+    TransferableBlock finalBlock = consumeInputBlocks();
+    if (finalBlock.isErrorBlock()) {
+      return finalBlock;
     }
+    return produceWindowAggregatedBlock();
   }
 
   private void validateAggregationCalls(String functionName,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org