You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/05 19:05:40 UTC

[pinot] branch master updated: support partial operator chain execution (#9711)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new aa013a432a support partial operator chain execution (#9711)
aa013a432a is described below

commit aa013a432a3464e658398326a3a98ff045fcfc9a
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Sat Nov 5 12:05:32 2022 -0700

    support partial operator chain execution (#9711)
    
    This is the first PR in a series of PRs to improve our execution model. It implements "partial execution" of operator chains by allowing them to return a "noop" `MetadataBlock` in the scenario where there is either no data to process or no data to output.
    
    This PR is a non-functional change because the `WorkerQueryExecutor` doesn't actually take advantage of the partial execution ability - it just calls `operator#nextBlock` whenever it processes a noop block.
---
 .../pinot/common/datablock/DataBlockUtils.java     |  10 +-
 .../pinot/common/datablock/MetadataBlock.java      | 110 ++++++++++++++++--
 .../pinot/common/datablock/MetadataBlockTest.java  | 128 +++++++++++++++++++++
 .../pinot/core/common/datablock/DataBlockTest.java |   1 -
 .../apache/pinot/query/runtime/QueryRunner.java    |   2 +-
 .../query/runtime/blocks/TransferableBlock.java    |  31 +++--
 .../runtime/blocks/TransferableBlockUtils.java     |  13 ++-
 .../runtime/executor/WorkerQueryExecutor.java      |  12 +-
 .../query/runtime/operator/AggregateOperator.java  | 103 +++++++++--------
 .../query/runtime/operator/FilterOperator.java     |  24 ++--
 .../query/runtime/operator/HashJoinOperator.java   |  91 ++++++++-------
 .../runtime/operator/LiteralValueOperator.java     |   2 +-
 .../runtime/operator/MailboxReceiveOperator.java   |  64 ++++++-----
 .../runtime/operator/MailboxSendOperator.java      |  15 ++-
 .../pinot/query/runtime/operator/SortOperator.java |  30 +++--
 .../query/runtime/operator/TransformOperator.java  |  32 +++---
 .../apache/pinot/query/service/QueryConfig.java    |   2 +-
 .../pinot/query/service/QueryDispatcher.java       |  31 +++--
 .../query/mailbox/InMemoryMailboxServiceTest.java  |   2 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |   2 +-
 .../query/runtime/TransferableBlockUtilsTest.java  |   2 +-
 .../runtime/operator/AggregateOperatorTest.java    |   3 +
 .../runtime/operator/HashJoinOperatorTest.java     |  15 ++-
 .../query/runtime/operator/OperatorTestUtil.java   |   6 +-
 24 files changed, 511 insertions(+), 220 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index f3cf116142..1dc8da3747 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -47,16 +47,20 @@ public final class DataBlockUtils {
   }
 
   public static MetadataBlock getErrorDataBlock(Map<Integer, String> exceptions) {
-    MetadataBlock errorBlock = new MetadataBlock();
+    MetadataBlock errorBlock = new MetadataBlock(MetadataBlock.MetadataBlockType.ERROR);
     for (Map.Entry<Integer, String> exception : exceptions.entrySet()) {
       errorBlock.addException(exception.getKey(), exception.getValue());
     }
     return errorBlock;
   }
 
-  public static MetadataBlock getEndOfStreamDataBlock(DataSchema dataSchema) {
+  public static MetadataBlock getEndOfStreamDataBlock() {
     // TODO: add query statistics metadata for the block.
-    return new MetadataBlock(dataSchema);
+    return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
+  }
+
+  public static MetadataBlock getNoOpBlock() {
+    return new MetadataBlock(MetadataBlock.MetadataBlockType.NOOP);
   }
 
   public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
index 8b063e4024..33f7ee965b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
@@ -18,28 +18,118 @@
  */
 package org.apache.pinot.common.datablock;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
  */
 public class MetadataBlock extends BaseDataBlock {
-  private static final int VERSION = 1;
 
-  public MetadataBlock() {
-    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  private static final ObjectMapper JSON = new ObjectMapper();
+
+  @VisibleForTesting
+  static final int VERSION = 1;
+
+  public enum MetadataBlockType {
+    /**
+     * Indicates that this block is the final block to be sent
+     * (End Of Stream) as part of an operator chain computation.
+     */
+    EOS,
+
+    /**
+     * An {@code ERROR} metadata block indicates that there was
+     * some error during computation. To retrieve the error that
+     * occurred, use {@link MetadataBlock#getExceptions()}
+     */
+    ERROR,
+
+    /**
+     * A {@code NOOP} metadata block can be sent at any point to
+     * and should be ignored by downstream - it is often used to
+     * indicate that the operator chain either has nothing to process
+     * or has processed data but is not yet ready to emit a result
+     * block.
+     */
+    NOOP;
+
+    MetadataBlockType() {
+    }
   }
 
-  public MetadataBlock(DataSchema dataSchema) {
-    super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
+  /**
+   * Used to serialize the contents of the metadata block conveniently and in
+   * a backwards compatible way. Use JSON because the performance of metadata block
+   * SerDe should not be a bottleneck.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @VisibleForTesting
+  static class Contents {
+
+    private String _type;
+
+    @JsonCreator
+    public Contents(@JsonProperty("type") String type) {
+      _type = type;
+    }
+
+    @JsonCreator
+    public Contents() {
+      _type = null;
+    }
+
+    public String getType() {
+      return _type;
+    }
+
+    public void setType(String type) {
+      _type = type;
+    }
+  }
+
+  private final Contents _contents;
+
+  public MetadataBlock(MetadataBlockType type) {
+    super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name())));
+    _contents = new Contents(type.name());
+  }
+
+  private static byte[] toContents(Contents type) {
+    try {
+      return JSON.writeValueAsBytes(type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public MetadataBlock(ByteBuffer byteBuffer)
       throws IOException {
     super(byteBuffer);
+    if (_variableSizeDataBytes != null) {
+      _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
+    } else {
+      _contents = new Contents();
+    }
+  }
+
+  public MetadataBlockType getType() {
+    String type = _contents.getType();
+
+    // if type is null, then we're reading a legacy block where we didn't encode any
+    // data. assume that it is an EOS block if there's no exceptions and an ERROR block
+    // otherwise
+    return type == null
+        ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
+        : MetadataBlockType.valueOf(type);
   }
 
   @Override
@@ -49,12 +139,12 @@ public class MetadataBlock extends BaseDataBlock {
 
   @Override
   protected int getOffsetInFixedBuffer(int rowId, int colId) {
-    throw new UnsupportedOperationException();
+    throw new UnsupportedOperationException("Metadata block uses JSON encoding for field access");
   }
 
   @Override
   protected int positionOffsetInVariableBufferAndGetLength(int rowId, int colId) {
-    throw new UnsupportedOperationException();
+    throw new UnsupportedOperationException("Metadata block uses JSON encoding for field access");
   }
 
   @Override
@@ -64,6 +154,6 @@ public class MetadataBlock extends BaseDataBlock {
 
   @Override
   public MetadataBlock toDataOnlyDataTable() {
-    return new MetadataBlock(_dataSchema);
+    throw new UnsupportedOperationException();
   }
 }
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/datablock/MetadataBlockTest.java b/pinot-common/src/test/java/org/apache/pinot/common/datablock/MetadataBlockTest.java
new file mode 100644
index 0000000000..f350e2c3e6
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/datablock/MetadataBlockTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.common.datatable.DataTable;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+public class MetadataBlockTest {
+
+  @Test
+  public void shouldEncodeContentsAsJSON()
+      throws Exception {
+    // Given:
+    MetadataBlock.MetadataBlockType type = MetadataBlock.MetadataBlockType.EOS;
+
+    // When:
+    MetadataBlock metadataBlock = new MetadataBlock(type);
+
+    // Then:
+    byte[] expected = new ObjectMapper().writeValueAsBytes(new MetadataBlock.Contents("EOS"));
+    assertEquals(metadataBlock._variableSizeDataBytes, expected);
+  }
+
+  @Test
+  public void shouldDefaultToEosWithNoErrorsOnLegacyMetadataBlock()
+      throws IOException {
+    // Given:
+    // MetadataBlock used to be encoded without any data, we should make sure that
+    // during rollout or if server versions are mismatched that we can still handle
+    // the old format
+    OldMetadataBlock legacyBlock = new OldMetadataBlock();
+    byte[] bytes = legacyBlock.toBytes();
+
+    // When:
+    ByteBuffer buff = ByteBuffer.wrap(bytes);
+    buff.getInt(); // consume the version information before decoding
+    MetadataBlock metadataBlock = new MetadataBlock(buff);
+
+    // Then:
+    assertEquals(metadataBlock.getType(), MetadataBlock.MetadataBlockType.EOS);
+  }
+
+  @Test
+  public void shouldDefaultToErrorOnLegacyMetadataBlockWithErrors()
+      throws IOException {
+    // Given:
+    // MetadataBlock used to be encoded without any data, we should make sure that
+    // during rollout or if server versions are mismatched that we can still handle
+    // the old format
+    OldMetadataBlock legacyBlock = new OldMetadataBlock();
+    legacyBlock.addException(250, "timeout");
+    byte[] bytes = legacyBlock.toBytes();
+
+    // When:
+    ByteBuffer buff = ByteBuffer.wrap(bytes);
+    buff.getInt(); // consume the version information before decoding
+    MetadataBlock metadataBlock = new MetadataBlock(buff);
+
+    // Then:
+    assertEquals(metadataBlock.getType(), MetadataBlock.MetadataBlockType.ERROR);
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void shouldThrowExceptionWhenUsingReadMethods() {
+    // Given:
+    MetadataBlock block = new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
+
+    // When:
+    // (should through exception)
+    block.getInt(0, 0);
+  }
+
+  /**
+   * This is mostly just used as an internal serialization tool
+   */
+  private static class OldMetadataBlock extends BaseDataBlock {
+
+    public OldMetadataBlock() {
+      super(0, null, new String[0], new byte[0], new byte[0]);
+    }
+
+    @Override
+    protected int getDataBlockVersionType() {
+      return MetadataBlock.VERSION + (Type.METADATA.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+    }
+
+    @Override
+    protected int getOffsetInFixedBuffer(int rowId, int colId) {
+      return 0;
+    }
+
+    @Override
+    protected int positionOffsetInVariableBufferAndGetLength(int rowId, int colId) {
+      return 0;
+    }
+
+    @Override
+    public DataTable toMetadataOnlyDataTable() {
+      return null;
+    }
+
+    @Override
+    public DataTable toDataOnlyDataTable() {
+      return null;
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 73c47e1b76..9ff0db3334 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -58,7 +58,6 @@ public class DataBlockTest {
 
     BaseDataBlock dataBlock = DataBlockUtils.getErrorDataBlock(originalException);
     dataBlock.addException(processingException);
-    Assert.assertNull(dataBlock.getDataSchema());
     Assert.assertEquals(dataBlock.getNumberOfRows(), 0);
 
     // Assert processing exception and original exception both matches.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1a9a8cd88c..c409126b4a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -253,7 +253,7 @@ public class QueryRunner {
           return new TransferableBlock(_baseDataBlocks.get(_currentIndex++));
         } else {
           _currentIndex = -1;
-          return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_dataSchema));
+          return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
         }
       }
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 93508ffac6..856914b787 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pinot.query.runtime.blocks;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
 import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.datablock.ColumnarDataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datablock.RowDataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Block;
@@ -41,23 +41,15 @@ public class TransferableBlock implements Block {
 
   private final BaseDataBlock.Type _type;
   private final DataSchema _dataSchema;
-  private final boolean _isErrorBlock;
   private final int _numRows;
 
   private BaseDataBlock _dataBlock;
   private List<Object[]> _container;
 
   public TransferableBlock(List<Object[]> container, DataSchema dataSchema, BaseDataBlock.Type containerType) {
-    this(container, dataSchema, containerType, false);
-  }
-
-  @VisibleForTesting
-  TransferableBlock(List<Object[]> container, DataSchema dataSchema, BaseDataBlock.Type containerType,
-      boolean isErrorBlock) {
     _container = container;
     _dataSchema = dataSchema;
     _type = containerType;
-    _isErrorBlock = isErrorBlock;
     _numRows = _container.size();
   }
 
@@ -66,7 +58,6 @@ public class TransferableBlock implements Block {
     _dataSchema = dataBlock.getDataSchema();
     _type = dataBlock instanceof ColumnarDataBlock ? BaseDataBlock.Type.COLUMNAR
         : dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW : BaseDataBlock.Type.METADATA;
-    _isErrorBlock = !_dataBlock.getExceptions().isEmpty();
     _numRows = _dataBlock.getNumberOfRows();
   }
 
@@ -149,7 +140,14 @@ public class TransferableBlock implements Block {
    * @return whether this block is the end of stream.
    */
   public boolean isEndOfStreamBlock() {
-    return _type == BaseDataBlock.Type.METADATA;
+    return isType(MetadataBlock.MetadataBlockType.ERROR) || isType(MetadataBlock.MetadataBlockType.EOS);
+  }
+
+  /**
+   * @return whether this block represents a NOOP block
+   */
+  public boolean isNoOpBlock() {
+    return isType(MetadataBlock.MetadataBlockType.NOOP);
   }
 
   /**
@@ -158,7 +156,16 @@ public class TransferableBlock implements Block {
    * @return true if contains exception.
    */
   public boolean isErrorBlock() {
-    return _isErrorBlock;
+    return isType(MetadataBlock.MetadataBlockType.ERROR);
+  }
+
+  private boolean isType(MetadataBlock.MetadataBlockType type) {
+    if (_type != BaseDataBlock.Type.METADATA) {
+      return false;
+    }
+
+    MetadataBlock metadata = (MetadataBlock) _dataBlock;
+    return metadata.getType() == type;
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index 2934f74759..caa25e70b3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.RowDataBlock;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 public final class TransferableBlockUtils {
@@ -34,8 +33,12 @@ public final class TransferableBlockUtils {
     // do not instantiate.
   }
 
-  public static TransferableBlock getEndOfStreamTransferableBlock(DataSchema dataSchema) {
-    return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(dataSchema));
+  public static TransferableBlock getEndOfStreamTransferableBlock() {
+    return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+  }
+
+  public static TransferableBlock getNoOpTransferableBlock() {
+    return new TransferableBlock(DataBlockUtils.getNoOpBlock());
   }
 
   public static TransferableBlock getErrorTransferableBlock(Exception e) {
@@ -50,6 +53,10 @@ public final class TransferableBlockUtils {
     return transferableBlock.isEndOfStreamBlock();
   }
 
+  public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
+    return transferableBlock.isNoOpBlock();
+  }
+
   /**
    *  Split a block into multiple block so that each block size is within maxBlockSize.
    *  Currently, we only support split for row type dataBlock.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 2436061aa9..18bb2defa5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -79,11 +79,15 @@ public class WorkerQueryExecutor {
     executorService.submit(new TraceRunnable() {
       @Override
       public void runJob() {
-        ThreadTimer executionThreadTimer = new ThreadTimer();
-        while (!TransferableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
-          LOGGER.debug("Result Block acquired");
+        try {
+          ThreadTimer executionThreadTimer = new ThreadTimer();
+          while (!TransferableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
+            LOGGER.debug("Result Block acquired");
+          }
+          LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
+        } catch (Exception e) {
+          LOGGER.error("Failed to execute query!", e);
         }
-        LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
       }
     });
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 56f2b9087b..3edd821b07 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -65,7 +65,9 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
   private final Map<Key, Object>[] _groupByResultHolders;
   private final Map<Key, Object[]> _groupByKeyHolder;
   private TransferableBlock _upstreamErrorBlock;
-  private boolean _isCumulativeBlockConstructed;
+
+  private boolean _readyToConstruct;
+  private boolean _hasReturnedAggregateBlock;
 
   // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
   // aggCalls has to be a list of FunctionCall and cannot be null
@@ -95,7 +97,8 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
     }
     _resultSchema = dataSchema;
 
-    _isCumulativeBlockConstructed = false;
+    _readyToConstruct = false;
+    _hasReturnedAggregateBlock = false;
   }
 
   private RexExpression toAggregationFunctionOperand(RexExpression rexExpression) {
@@ -119,8 +122,16 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
   @Override
   protected TransferableBlock getNextBlock() {
     try {
-      consumeInputBlocks();
-      return produceAggregatedBlock();
+      if (!_readyToConstruct) {
+        consumeInputBlocks();
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
+      if (!_hasReturnedAggregateBlock) {
+        return produceAggregatedBlock();
+      } else {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
     }
@@ -131,58 +142,52 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
     }
-    if (!_isCumulativeBlockConstructed) {
-      List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size());
-      for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
-        Object[] row = new Object[_aggCalls.size() + _groupSet.size()];
-        Object[] keyElements = e.getValue();
-        for (int i = 0; i < keyElements.length; i++) {
-          row[i] = keyElements[i];
-        }
-        for (int i = 0; i < _groupByResultHolders.length; i++) {
-          row[i + _groupSet.size()] = _groupByResultHolders[i].get(e.getKey());
-        }
-        rows.add(row);
-      }
-      _isCumulativeBlockConstructed = true;
-      if (rows.size() == 0) {
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock(_resultSchema);
-      } else {
-        return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
+
+    List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size());
+    for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+      Object[] row = new Object[_aggCalls.size() + _groupSet.size()];
+      Object[] keyElements = e.getValue();
+      System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+      for (int i = 0; i < _groupByResultHolders.length; i++) {
+        row[i + _groupSet.size()] = _groupByResultHolders[i].get(e.getKey());
       }
+      rows.add(row);
+    }
+    _hasReturnedAggregateBlock = true;
+    if (rows.size() == 0) {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     } else {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_resultSchema);
+      return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
     }
   }
 
   private void consumeInputBlocks() {
-    if (!_isCumulativeBlockConstructed) {
-      TransferableBlock block = _inputOperator.nextBlock();
-      while (!TransferableBlockUtils.isEndOfStream(block)) {
-        BaseDataBlock dataBlock = block.getDataBlock();
-        int numRows = dataBlock.getNumberOfRows();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
-          Key key = extraRowKey(row, _groupSet);
-          _groupByKeyHolder.put(key, key.getValues());
-          for (int i = 0; i < _aggCalls.size(); i++) {
-            Object currentRes = _groupByResultHolders[i].get(key);
-            // TODO: fix that single agg result (original type) has different type from multiple agg results (double).
-            if (currentRes == null) {
-              _groupByResultHolders[i].put(key, _aggregationFunctionInputRefs[i] == -1 ? _aggregationFunctionLiterals[i]
-                  : row[_aggregationFunctionInputRefs[i]]);
-            } else {
-              _groupByResultHolders[i].put(key, merge(_aggCalls.get(i), currentRes,
-                  _aggregationFunctionInputRefs[i] == -1 ? _aggregationFunctionLiterals[i]
-                      : row[_aggregationFunctionInputRefs[i]]));
-            }
-          }
+    TransferableBlock block = _inputOperator.nextBlock();
+    // setting upstream error block
+    if (block.isErrorBlock()) {
+      _upstreamErrorBlock = block;
+    } else if (block.isEndOfStreamBlock()) {
+      _readyToConstruct = true;
+      return;
+    }
+
+    BaseDataBlock dataBlock = block.getDataBlock();
+    int numRows = dataBlock.getNumberOfRows();
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+      Key key = extraRowKey(row, _groupSet);
+      _groupByKeyHolder.put(key, key.getValues());
+      for (int i = 0; i < _aggCalls.size(); i++) {
+        Object currentRes = _groupByResultHolders[i].get(key);
+        // TODO: fix that single agg result (original type) has different type from multiple agg results (double).
+        if (currentRes == null) {
+          _groupByResultHolders[i].put(key, _aggregationFunctionInputRefs[i] == -1 ? _aggregationFunctionLiterals[i]
+              : row[_aggregationFunctionInputRefs[i]]);
+        } else {
+          _groupByResultHolders[i].put(key, merge(_aggCalls.get(i), currentRes,
+              _aggregationFunctionInputRefs[i] == -1 ? _aggregationFunctionLiterals[i]
+                  : row[_aggregationFunctionInputRefs[i]]));
         }
-        block = _inputOperator.nextBlock();
-      }
-      // setting upstream error block
-      if (block.isErrorBlock()) {
-        _upstreamErrorBlock = block;
       }
     }
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index b3aa17ac56..5620cec86f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
@@ -71,21 +70,20 @@ public class FilterOperator extends BaseOperator<TransferableBlock> {
       throws Exception {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
-    }
-    if (!TransferableBlockUtils.isEndOfStream(block)) {
-      List<Object[]> resultRows = new ArrayList<>();
-      List<Object[]> container = block.getContainer();
-      for (Object[] row : container) {
-        if (_filterOperand.apply(row)) {
-          resultRows.add(row);
-        }
-      }
-      return new TransferableBlock(resultRows, _dataSchema, BaseDataBlock.Type.ROW);
     } else if (block.isErrorBlock()) {
       _upstreamErrorBlock = block;
       return _upstreamErrorBlock;
-    } else {
-      return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_dataSchema));
+    } else if (TransferableBlockUtils.isEndOfStream(block) || TransferableBlockUtils.isNoOpBlock(block)) {
+      return block;
+    }
+
+    List<Object[]> resultRows = new ArrayList<>();
+    List<Object[]> container = block.getContainer();
+    for (Object[] row : container) {
+      if (_filterOperand.apply(row)) {
+        resultRows.add(row);
+      }
     }
+    return new TransferableBlock(resultRows, _dataSchema, BaseDataBlock.Type.ROW);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 61cd18ff5a..1c3111cbb9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -25,7 +25,6 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.Key;
@@ -99,11 +98,17 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    // Build JOIN hash table
-    buildBroadcastHashTable();
+    if (!_isHashTableBuilt) {
+      // Build JOIN hash table
+      buildBroadcastHashTable();
+    }
+
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (!_isHashTableBuilt) {
+      return TransferableBlockUtils.getNoOpTransferableBlock();
     }
+
     // JOIN each left block with the right block.
     try {
       return buildJoinedDataBlock(_leftTableOperator.nextBlock());
@@ -113,54 +118,58 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
   }
 
   private void buildBroadcastHashTable() {
-    if (!_isHashTableBuilt) {
-      TransferableBlock rightBlock = _rightTableOperator.nextBlock();
-      while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
-        List<Object[]> container = rightBlock.getContainer();
-        // put all the rows into corresponding hash collections keyed by the key selector function.
-        for (Object[] row : container) {
-          List<Object[]> hashCollection =
-              _broadcastHashTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
-          hashCollection.add(row);
-        }
-        rightBlock = _rightTableOperator.nextBlock();
-      }
-      if (rightBlock.isErrorBlock()) {
-        _upstreamErrorBlock = rightBlock;
-      }
+    TransferableBlock rightBlock = _rightTableOperator.nextBlock();
+    if (rightBlock.isErrorBlock()) {
+      _upstreamErrorBlock = rightBlock;
+      return;
+    }
+
+    if (TransferableBlockUtils.isEndOfStream(rightBlock)) {
       _isHashTableBuilt = true;
+      return;
+    } else if (TransferableBlockUtils.isNoOpBlock(rightBlock)) {
+      return;
+    }
+
+    List<Object[]> container = rightBlock.getContainer();
+    // put all the rows into corresponding hash collections keyed by the key selector function.
+    for (Object[] row : container) {
+      List<Object[]> hashCollection =
+          _broadcastHashTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
+      hashCollection.add(row);
     }
   }
 
   private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock)
       throws Exception {
-    if (!TransferableBlockUtils.isEndOfStream(leftBlock)) {
-      List<Object[]> rows = new ArrayList<>();
-      List<Object[]> container = leftBlock.getContainer();
-      for (Object[] leftRow : container) {
-        List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
-            new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
-        // If it is a left join and right table is empty, we return left rows.
-        if (hashCollection.isEmpty() && _joinType == JoinRelType.LEFT) {
-          rows.add(joinRow(leftRow, null));
-        } else {
-          // If it is other type of join.
-          for (Object[] rightRow : hashCollection) {
-            Object[] resultRow = joinRow(leftRow, rightRow);
-            if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
-              evaluator -> evaluator.apply(resultRow))) {
-              rows.add(resultRow);
-            }
+    if (leftBlock.isErrorBlock()) {
+      _upstreamErrorBlock = leftBlock;
+      return _upstreamErrorBlock;
+    } else if (TransferableBlockUtils.isEndOfStream(leftBlock) || TransferableBlockUtils.isNoOpBlock(leftBlock)) {
+      return leftBlock;
+    }
+
+    List<Object[]> rows = new ArrayList<>();
+    List<Object[]> container = leftBlock.getContainer();
+    for (Object[] leftRow : container) {
+      List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
+          new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
+      // If it is a left join and right table is empty, we return left rows.
+      if (hashCollection.isEmpty() && _joinType == JoinRelType.LEFT) {
+        rows.add(joinRow(leftRow, null));
+      } else {
+        // If it is other type of join.
+        for (Object[] rightRow : hashCollection) {
+          Object[] resultRow = joinRow(leftRow, rightRow);
+          if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
+            evaluator -> evaluator.apply(resultRow))) {
+            rows.add(resultRow);
           }
         }
       }
-      return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
-    } else if (leftBlock.isErrorBlock()) {
-      _upstreamErrorBlock = leftBlock;
-      return _upstreamErrorBlock;
-    } else {
-      return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_resultSchema));
     }
+
+    return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
   }
 
   private Object[] joinRow(Object[] leftRow, @Nullable Object[] rightRow) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 9ad6c6fb22..223ab4a04a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -61,7 +61,7 @@ public class LiteralValueOperator extends BaseOperator<TransferableBlock> {
       _isLiteralBlockReturned = true;
       return _rexLiteralBlock;
     } else {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 781eaccc2c..d45d5bceb1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -58,6 +58,8 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private final long _jobId;
   private final int _stageId;
   private final long _timeout;
+
+  private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
   public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
@@ -90,9 +92,10 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
     _port = port;
     _jobId = jobId;
     _stageId = stageId;
-    _timeout = QueryConfig.DEFAULT_TIMEOUT_NANO;
+    _timeout = System.nanoTime() + QueryConfig.DEFAULT_TIMEOUT_NANO;
     _upstreamErrorBlock = null;
     _keySelector = keySelector;
+    _serverIdx = 0;
   }
 
   @Override
@@ -111,37 +114,46 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (System.nanoTime() >= _timeout) {
+      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
+      return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
     }
-    // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data.
-    boolean hasOpenedMailbox = true;
-    long timeoutWatermark = System.nanoTime() + _timeout;
-    while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
-      hasOpenedMailbox = false;
-      for (ServerInstance sendingInstance : _sendingStageInstances) {
-        try {
-          ReceivingMailbox<TransferableBlock> receivingMailbox =
-              _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
-          // TODO this is not threadsafe.
-          // make sure only one thread is checking receiving mailbox and calling receive() then close()
-          if (!receivingMailbox.isClosed()) {
-            hasOpenedMailbox = true;
-            TransferableBlock transferableBlock = receivingMailbox.receive();
-            if (transferableBlock != null && !transferableBlock.isEndOfStreamBlock()) {
-              // Return the block only if it has some valid data
-              return transferableBlock;
+
+    int startingIdx = _serverIdx;
+    int openMailboxCount = 0;
+    int eosCount = 0;
+
+    for (int i = 0; i < _sendingStageInstances.size(); i++) {
+      // this implements a round-robin mailbox iterator so we don't starve any mailboxes
+      _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
+
+      ServerInstance server = _sendingStageInstances.get(_serverIdx);
+      try {
+        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server));
+        if (!mailbox.isClosed()) {
+          openMailboxCount++;
+
+          // this is blocking for 100ms and may return null
+          TransferableBlock block = mailbox.receive();
+          if (block != null) {
+            if (!block.isEndOfStreamBlock()) {
+              return block;
             }
+            eosCount++;
           }
-        } catch (Exception e) {
-          LOGGER.error(String.format("Error receiving data from mailbox %s", sendingInstance), e);
         }
+      } catch (Exception e) {
+        LOGGER.error(String.format("Error receiving data from mailbox %s", server), e);
       }
     }
-    if (System.nanoTime() >= timeoutWatermark) {
-      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
-      return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-    } else {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
-    }
+
+    // if we opened at least one mailbox, but still got to this point, then that means
+    // all the mailboxes we opened returned null but were not yet closed - early terminate
+    // with a noop block. Otherwise, we have exhausted all data from all mailboxes and can
+    // return EOS
+    return openMailboxCount > 0
+        ? TransferableBlockUtils.getNoOpTransferableBlock()
+        : TransferableBlockUtils.getEndOfStreamTransferableBlock();
   }
 
   public RelDistribution.Type getExchangeType() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index a53864ab56..a26fa19626 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -114,7 +114,20 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    TransferableBlock transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+    TransferableBlock transferableBlock;
+    try {
+      transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+    } catch (final Exception e) {
+      // ideally, MailboxSendOperator doesn't ever throw an exception because
+      // it will just get swallowed, in this scenario at least we can forward
+      // any upstream exceptions as an error  block
+      transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+
+    if (TransferableBlockUtils.isNoOpBlock(transferableBlock)) {
+      return transferableBlock;
+    }
+
     boolean isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
     BaseDataBlock.Type type = transferableBlock.getType();
     try {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 1acb0c9a69..e710122d81 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -44,6 +44,7 @@ public class SortOperator extends BaseOperator<TransferableBlock> {
   private final PriorityQueue<Object[]> _rows;
   private final int _numRowsToKeep;
 
+  private boolean _readyToConstruct;
   private boolean _isSortedBlockConstructed;
   private TransferableBlock _upstreamErrorBlock;
 
@@ -87,7 +88,10 @@ public class SortOperator extends BaseOperator<TransferableBlock> {
       throws IOException {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (!_readyToConstruct) {
+      return TransferableBlockUtils.getNoOpTransferableBlock();
     }
+
     if (!_isSortedBlockConstructed) {
       LinkedList<Object[]> rows = new LinkedList<>();
       while (_rows.size() > _offset) {
@@ -96,30 +100,34 @@ public class SortOperator extends BaseOperator<TransferableBlock> {
       }
       _isSortedBlockConstructed = true;
       if (rows.size() == 0) {
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       } else {
         return new TransferableBlock(rows, _dataSchema, BaseDataBlock.Type.ROW);
       }
     } else {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     }
   }
 
   private void consumeInputBlocks() {
     if (!_isSortedBlockConstructed) {
       TransferableBlock block = _upstreamOperator.nextBlock();
-      while (!TransferableBlockUtils.isEndOfStream(block)) {
-        BaseDataBlock dataBlock = block.getDataBlock();
-        int numRows = dataBlock.getNumberOfRows();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
-          SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
-        }
-        block = _upstreamOperator.nextBlock();
-      }
       // setting upstream error block
       if (block.isErrorBlock()) {
         _upstreamErrorBlock = block;
+        return;
+      } else if (TransferableBlockUtils.isEndOfStream(block)) {
+        _readyToConstruct = true;
+        return;
+      } else if (TransferableBlockUtils.isNoOpBlock(block)) {
+        return;
+      }
+
+      BaseDataBlock dataBlock = block.getDataBlock();
+      int numRows = dataBlock.getNumberOfRows();
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+        SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
       }
     }
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 90efc377ab..a530f0bc88 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
@@ -77,25 +76,26 @@ public class TransformOperator extends BaseOperator<TransferableBlock> {
 
   private TransferableBlock transform(TransferableBlock block)
       throws Exception {
+    if (block.isErrorBlock()) {
+      _upstreamErrorBlock = block;
+    }
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
     }
-    if (!TransferableBlockUtils.isEndOfStream(block)) {
-      List<Object[]> resultRows = new ArrayList<>();
-      List<Object[]> container = block.getContainer();
-      for (Object[] row : container) {
-        Object[] resultRow = new Object[_resultColumnSize];
-        for (int i = 0; i < _resultColumnSize; i++) {
-          resultRow[i] = _transformOperandsList.get(i).apply(row);
-        }
-        resultRows.add(resultRow);
+
+    if (TransferableBlockUtils.isEndOfStream(block) || TransferableBlockUtils.isNoOpBlock(block)) {
+      return block;
+    }
+
+    List<Object[]> resultRows = new ArrayList<>();
+    List<Object[]> container = block.getContainer();
+    for (Object[] row : container) {
+      Object[] resultRow = new Object[_resultColumnSize];
+      for (int i = 0; i < _resultColumnSize; i++) {
+        resultRow[i] = _transformOperandsList.get(i).apply(row);
       }
-      return new TransferableBlock(resultRows, _resultSchema, BaseDataBlock.Type.ROW);
-    } else if (block.isErrorBlock()) {
-      _upstreamErrorBlock = block;
-      return _upstreamErrorBlock;
-    } else {
-      return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_resultSchema));
+      resultRows.add(resultRow);
     }
+    return new TransferableBlock(resultRows, _resultSchema, BaseDataBlock.Type.ROW);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index 76e10e92dc..3b357466e0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -22,7 +22,7 @@ package org.apache.pinot.query.service;
  * Configuration for setting up query runtime.
  */
 public class QueryConfig {
-  public static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
+  public static final long DEFAULT_TIMEOUT_NANO = 100_000_000_000L;
 
   public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = "pinot.query.runner.max.msg.size.bytes";
   public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = 16 * 1024 * 1024;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index b29700dde0..229ede0c16 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -22,14 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.util.Pair;
-import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
@@ -75,7 +72,8 @@ public class QueryDispatcher {
         requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), mailboxService.getHostname(),
         mailboxService.getMailboxPort());
     List<DataTable> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
-    return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields());
+    return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
+        queryPlan.getQueryStageMap().get(0).getDataSchema());
   }
 
   public int submit(long requestId, QueryPlan queryPlan)
@@ -137,26 +135,23 @@ public class QueryDispatcher {
           throw new RuntimeException("Received error query execution result block: "
               + transferableBlock.getDataBlock().getExceptions());
       }
-      if (transferableBlock.getDataBlock() != null) {
-        BaseDataBlock dataTable = transferableBlock.getDataBlock();
-        resultDataBlocks.add(dataTable);
+      if (transferableBlock.isNoOpBlock()) {
+        continue;
+      } else if (transferableBlock.isEndOfStreamBlock()) {
+        return resultDataBlocks;
       }
-      if (transferableBlock.isEndOfStreamBlock()) {
-        break;
-      }
-    }
-    if (System.nanoTime() >= timeoutWatermark) {
-      resultDataBlocks = Collections.singletonList(
-          DataBlockUtils.getErrorDataBlock(QueryException.EXECUTION_TIMEOUT_ERROR));
+
+      resultDataBlocks.add(transferableBlock.getDataBlock());
     }
-    return resultDataBlocks;
+
+    throw new RuntimeException("Timed out while receiving from mailbox: " + QueryException.EXECUTION_TIMEOUT_ERROR);
   }
 
-  public static ResultTable toResultTable(List<DataTable> queryResult, List<Pair<Integer, String>> fields) {
-    DataSchema resultSchema = null;
+  public static ResultTable toResultTable(List<DataTable> queryResult, List<Pair<Integer, String>> fields,
+      DataSchema sourceSchema) {
     List<Object[]> resultRows = new ArrayList<>();
+    DataSchema resultSchema = toResultSchema(sourceSchema, fields);
     for (DataTable dataTable : queryResult) {
-      resultSchema = resultSchema == null ? toResultSchema(dataTable.getDataSchema(), fields) : resultSchema;
       int numColumns = resultSchema.getColumnNames().length;
       int numRows = dataTable.getNumberOfRows();
       DataSchema.ColumnDataType[] resultColumnDataTypes = resultSchema.getColumnDataTypes();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index 6f41434e03..a51b9c8c0c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -95,7 +95,7 @@ public class InMemoryMailboxServiceTest {
 
   private TransferableBlock getTestTransferableBlock(int index, boolean isEndOfStream) {
     if (isEndOfStream) {
-      return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(TEST_DATA_SCHEMA));
+      return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
     }
     List<Object[]> rows = new ArrayList<>(index);
     rows.add(new Object[]{index, "test_data"});
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 3c985d77aa..b614b49b7f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -85,7 +85,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
     }
     Preconditions.checkNotNull(mailboxReceiveOperator);
     return QueryDispatcher.toResultTable(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator),
-        queryPlan.getQueryResultFields()).getRows();
+        queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
   }
 
   private List<Object[]> queryH2(String sql)
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
index 4fa2c8aa9c..0f9ac7fdf8 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
@@ -88,7 +88,7 @@ public class TransferableBlockUtilsTest {
     validateNonSplittableBlock(columnarBlock);
 
     // METADATA
-    MetadataBlock metadataBlock = new MetadataBlock();
+    MetadataBlock metadataBlock = new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
     validateNonSplittableBlock(metadataBlock);
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 97e27e10ab..d467ba8112 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -56,6 +56,9 @@ public class AggregateOperatorTest {
         new AggregateOperator(_upstreamOperator, OperatorTestUtil.TEST_DATA_SCHEMA, Arrays.asList(agg),
             Arrays.asList(new RexExpression.InputRef(1)));
     TransferableBlock result = sum0GroupBy1.getNextBlock();
+    while (result.isNoOpBlock()) {
+      result = sum0GroupBy1.getNextBlock();
+    }
     List<Object[]> resultRows = result.getContainer();
     List<Object[]> expectedRows = Arrays.asList(new Object[]{"Aa", 1}, new Object[]{"BB", 5.0});
     Assert.assertEquals(resultRows.size(), expectedRows.size());
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 768371636d..9975cd57ff 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -71,7 +71,10 @@ public class HashJoinOperatorTest {
     HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
 
-    TransferableBlock result = join.getNextBlock();
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
     List<Object[]> resultRows = result.getContainer();
     List<Object[]> expectedRows =
         Arrays.asList(new Object[]{1, "Aa", 1, "Aa"}, new Object[]{2, "BB", 2, "BB"}, new Object[]{2, "BB", 3, "BB"},
@@ -101,7 +104,10 @@ public class HashJoinOperatorTest {
     HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
 
-    TransferableBlock result = join.getNextBlock();
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
     List<Object[]> resultRows = result.getContainer();
     Object[] expRow = new Object[]{1, "Aa", 2, "Aa"};
     List<Object[]> expectedRows = new ArrayList<>();
@@ -127,7 +133,10 @@ public class HashJoinOperatorTest {
     HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.LEFT);
 
-    TransferableBlock result = join.getNextBlock();
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
     List<Object[]> resultRows = result.getContainer();
     List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "BB", null, null},
         new Object[]{3, "BB", null, null});
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index da8a9cc599..aaa190a3d4 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -33,11 +33,11 @@ public class OperatorTestUtil {
       new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
 
   public static TransferableBlock getEndOfStreamRowBlock() {
-    return getEndOfStreamRowBlockWithSchema(TEST_DATA_SCHEMA);
+    return getEndOfStreamRowBlockWithSchema();
   }
 
-  public static TransferableBlock getEndOfStreamRowBlockWithSchema(DataSchema schema) {
-    return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(schema));
+  public static TransferableBlock getEndOfStreamRowBlockWithSchema() {
+    return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
   }
 
   public static TransferableBlock getRowDataBlock(List<Object[]> rows) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org