You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/06/14 22:34:45 UTC

[pinot] branch master updated: Use multi-stage DataBlock as DataTableImplV4 (#8874)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c802786ea9 Use multi-stage DataBlock as DataTableImplV4 (#8874)
c802786ea9 is described below

commit c802786ea95cff67b83ff4d24f796b965e565854
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Jun 14 15:34:40 2022 -0700

    Use multi-stage DataBlock as DataTableImplV4 (#8874)
    
    multi-stage DataBlock as DataTableImplV4
    
    * refactor data blocks into pinot-core
    * do not allow experimental data table version setting via server config
    * add multi-stage query engine integration test.
    * address diff comments
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../core/common/datablock}/BaseDataBlock.java      |   2 +-
 .../core/common/datablock}/ColumnarDataBlock.java  |   2 +-
 .../core/common/datablock}/DataBlockBuilder.java   |   5 +-
 .../core/common/datablock}/DataBlockUtils.java     |  21 +---
 .../core/common/datablock}/MetadataBlock.java      |   2 +-
 .../pinot/core/common/datablock}/RowDataBlock.java |   2 +-
 .../pinot/core/common/datatable/BaseDataTable.java |   7 +-
 .../core/common/datatable/DataTableBuilder.java    |  69 ++++++----
 .../core/common/datatable/DataTableFactory.java    |   2 +
 .../core/common/datatable/DataTableImplV2.java     |   7 +-
 .../core/common/datatable/DataTableImplV3.java     |   7 +-
 ...{DataTableFactory.java => DataTableImplV4.java} |  39 +++---
 .../core/common/datatable/DataTableUtils.java      |  10 +-
 .../core/common/datablock}/DataBlockTest.java      |  51 +++++++-
 .../core/common/datablock}/DataBlockTestUtils.java |   2 +-
 .../core/common/datatable/DataTableSerDeTest.java  | 134 ++++++++++++++++++++
 .../tests/MultiStageEngineIntegrationTest.java     | 139 +++++++++++++++++++++
 .../apache/pinot/query/runtime/QueryRunner.java    |   7 +-
 .../query/runtime/blocks/TransferableBlock.java    |   3 +
 .../runtime/blocks/TransferableBlockUtils.java     |  51 ++++++++
 .../runtime/executor/WorkerQueryExecutor.java      |   4 +-
 .../query/runtime/operator/HashJoinOperator.java   |  13 +-
 .../runtime/operator/MailboxReceiveOperator.java   |   7 +-
 .../runtime/operator/MailboxSendOperator.java      |   9 +-
 .../pinot/query/service/QueryDispatcher.java       |   6 +-
 .../query/mailbox/GrpcMailboxServiceTest.java      |   2 +-
 .../server/starter/helix/BaseServerStarter.java    |   9 +-
 27 files changed, 510 insertions(+), 102 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
similarity index 99%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 97d007b58b..607fdef8bd 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
 
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
similarity index 98%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
index 8510043251..1d2429f118 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
similarity index 98%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 0c456c1d85..ff6cc4e43c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.common.datatable.DataTableUtils;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -73,7 +72,7 @@ public class DataBlockBuilder {
       }
     } else if (_blockType == BaseDataBlock.Type.ROW) {
       _columnOffsets = new int[_numColumns];
-      _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
+      _rowSizeInBytes = DataBlockUtils.computeColumnOffsets(dataSchema, _columnOffsets);
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
similarity index 85%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
index 690be1353a..f1dedd49d0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,11 +37,6 @@ public final class DataBlockUtils {
   static {
     EOS_DATA_BLOCK._metadata.put(DataTable.MetadataKey.TABLE.getName(), "END_OF_STREAM");
   }
-  private static final TransferableBlock EOS_TRANSFERABLE_BLOCK = new TransferableBlock(EOS_DATA_BLOCK);
-
-  public static TransferableBlock getEndOfStreamTransferableBlock() {
-    return EOS_TRANSFERABLE_BLOCK;
-  }
 
   public static MetadataBlock getEndOfStreamDataBlock() {
     return EOS_DATA_BLOCK;
@@ -58,24 +53,10 @@ public final class DataBlockUtils {
     return errorBlock;
   }
 
-  public static TransferableBlock getErrorTransferableBlock(Exception e) {
-    return new TransferableBlock(getErrorDataBlock(e));
-  }
-
   public static MetadataBlock getEmptyDataBlock(DataSchema dataSchema) {
     return dataSchema == null ? EOS_DATA_BLOCK : new MetadataBlock(dataSchema);
   }
 
-  public static TransferableBlock getEmptyTransferableBlock(DataSchema dataSchema) {
-    return new TransferableBlock(getEmptyDataBlock(dataSchema));
-  }
-
-  public static boolean isEndOfStream(TransferableBlock transferableBlock) {
-    return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA)
-        && "END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata()
-            .get(DataTable.MetadataKey.TABLE.getName()));
-  }
-
   public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer)
       throws IOException {
     int versionType = byteBuffer.getInt();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
similarity index 97%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
index a3ffc7a126..9be572d938 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
similarity index 98%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
index f8520c54f6..bece1a7eaf 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 66f5086a01..788fe62765 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -58,7 +58,7 @@ public abstract class BaseDataTable implements DataTable {
     _numColumns = dataSchema.size();
     _dataSchema = dataSchema;
     _columnOffsets = new int[_numColumns];
-    _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
+    _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets, getVersion());
     _dictionaryMap = dictionaryMap;
     _fixedSizeDataBytes = fixedSizeDataBytes;
     _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
@@ -84,6 +84,11 @@ public abstract class BaseDataTable implements DataTable {
     _metadata = new HashMap<>();
   }
 
+  /**
+   * get the current data table version.
+   */
+  public abstract int getVersion();
+
   /**
    * Helper method to serialize dictionary map.
    */
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index a0b046a813..05f29bd774 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -73,14 +73,20 @@ import org.apache.pinot.spi.utils.ByteArray;
  *
  *
  */
-// TODO: potential optimizations:
-// TODO:   1. Fix float size.
-// TODO:   2. Use one dictionary for all columns (save space).
-// TODO:   3. Given a data schema, write all values one by one instead of using rowId and colId to position (save time).
-// TODO:   4. Store bytes as variable size data instead of String
+// TODO: potential optimizations (DataTableV3 and below)
+// TODO:   - Fix float size.
+// TODO:     note:  (Fixed in V4, remove this comment once V3 is deprecated)
+// TODO:   - Given a data schema, write all values one by one instead of using rowId and colId to position (save time).
+// TODO:     note:  (Fixed in V4, remove this comment once V3 is deprecated)
+// TODO:   - Store bytes as variable size data instead of String
+// TODO:     note:  (Fixed in V4, remove this comment once V3 is deprecated)
+// TODO:   - use builder factory pattern to for different version so that no version check per build.
+// TODO:   - Use one dictionary for all columns (save space).
+
 public class DataTableBuilder {
   public static final int VERSION_2 = 2;
   public static final int VERSION_3 = 3;
+  public static final int VERSION_4 = 4;
   private static int _version = VERSION_3;
   private final DataSchema _dataSchema;
   private final int[] _columnOffsets;
@@ -98,15 +104,24 @@ public class DataTableBuilder {
   public DataTableBuilder(DataSchema dataSchema) {
     _dataSchema = dataSchema;
     _columnOffsets = new int[dataSchema.size()];
-    _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
+    _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets, _version);
   }
 
   public static DataTable getEmptyDataTable() {
-    return _version == VERSION_2 ? new DataTableImplV2() : new DataTableImplV3();
+    switch (_version) {
+      case VERSION_2:
+        return new DataTableImplV2();
+      case VERSION_3:
+        return new DataTableImplV3();
+      case VERSION_4:
+        return new DataTableImplV4();
+      default:
+        throw new IllegalStateException("Unexpected value: " + _version);
+    }
   }
 
   public static void setCurrentDataTableVersion(int version) {
-    if (version != VERSION_2 && version != VERSION_3) {
+    if (version != VERSION_2 && version != VERSION_3 && version != VERSION_4) {
       throw new IllegalArgumentException("Unsupported version: " + version);
     }
     _version = version;
@@ -191,19 +206,16 @@ public class DataTableBuilder {
 
   public void setColumn(int colId, ByteArray value)
       throws IOException {
-    // NOTE: Use String to store bytes value in DataTable V2 for backward-compatibility
-    setColumn(colId, value.toHexString());
-
-    /*
-    TODO: Store bytes as variable size data instead of String. Make the change for the next version data table for
-          backward-compatibility
-
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    byte[] bytes = value.getBytes();
-    _currentRowDataByteBuffer.putInt(bytes.length);
-    _variableSizeDataByteArrayOutputStream.write(bytes);
-     */
+    if (_version >= 4) {
+      _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+      _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+      byte[] bytes = value.getBytes();
+      _currentRowDataByteBuffer.putInt(bytes.length);
+      _variableSizeDataByteArrayOutputStream.write(bytes);
+    } else {
+      // NOTE: Use String to store bytes value in DataTable V2 for backward-compatibility
+      setColumn(colId, value.toHexString());
+    }
   }
 
   public void setColumn(int colId, Object value)
@@ -288,9 +300,18 @@ public class DataTableBuilder {
   }
 
   public DataTable build() {
-    return _version == VERSION_2 ? new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap,
-        _fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray())
-        : new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap,
+    switch (_version) {
+      case VERSION_2:
+        return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap,
             _fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray());
+      case VERSION_3:
+        return new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap,
+            _fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray());
+      case VERSION_4:
+        return new DataTableImplV4(_numRows, _dataSchema, _reverseDictionaryMap,
+            _fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray());
+      default:
+        throw new IllegalStateException("Unexpected value: " + _version);
+    }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
index 54e305edbc..4a39f110d9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
@@ -35,6 +35,8 @@ public class DataTableFactory {
         return new DataTableImplV2(byteBuffer);
       case DataTableBuilder.VERSION_3:
         return new DataTableImplV3(byteBuffer);
+      case DataTableBuilder.VERSION_4:
+        return new DataTableImplV4(byteBuffer);
       default:
         throw new UnsupportedOperationException("Unsupported data table version: " + version);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
index 1340b45e19..af3665a48e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
@@ -92,7 +92,7 @@ public class DataTableImplV2 extends BaseDataTable {
       byteBuffer.position(dataSchemaStart);
       _dataSchema = DataSchema.fromBytes(byteBuffer);
       _columnOffsets = new int[_dataSchema.size()];
-      _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets);
+      _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets, getVersion());
     } else {
       _dataSchema = null;
       _columnOffsets = null;
@@ -122,6 +122,11 @@ public class DataTableImplV2 extends BaseDataTable {
     }
   }
 
+  @Override
+  public int getVersion() {
+    return DataTableBuilder.VERSION_2;
+  }
+
   private Map<String, String> deserializeMetadata(ByteBuffer buffer)
       throws IOException {
       int numEntries = buffer.getInt();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
index 7838c362fe..edaf1d5479 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
@@ -128,7 +128,7 @@ public class DataTableImplV3 extends BaseDataTable {
       byteBuffer.position(dataSchemaStart);
       _dataSchema = DataSchema.fromBytes(byteBuffer);
       _columnOffsets = new int[_dataSchema.size()];
-      _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets);
+      _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets, getVersion());
     } else {
       _dataSchema = null;
       _columnOffsets = null;
@@ -164,6 +164,11 @@ public class DataTableImplV3 extends BaseDataTable {
     }
   }
 
+  @Override
+  public int getVersion() {
+    return DataTableBuilder.VERSION_3;
+  }
+
   @Override
   public void addException(ProcessingException processingException) {
     _errCodeToExceptionMap.put(processingException.getErrorCode(), processingException.getMessage());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
similarity index 53%
copy from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
copy to pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
index 54e305edbc..fe32ac84a7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
@@ -16,32 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pinot.core.common.datatable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataTable;
+import java.util.Map;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.RowDataBlock;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+
 
+/**
+ * Datatable V4 Implementation is a wrapper around the Row-based data block.
+ */
+@InterfaceStability.Evolving
+public class DataTableImplV4 extends RowDataBlock {
 
-public class DataTableFactory {
-  private DataTableFactory() {
+  public DataTableImplV4() {
+    super();
   }
 
-  public static DataTable getDataTable(ByteBuffer byteBuffer)
+  public DataTableImplV4(ByteBuffer byteBuffer)
       throws IOException {
-    int version = byteBuffer.getInt();
-    switch (version) {
-      case DataTableBuilder.VERSION_2:
-        return new DataTableImplV2(byteBuffer);
-      case DataTableBuilder.VERSION_3:
-        return new DataTableImplV3(byteBuffer);
-      default:
-        throw new UnsupportedOperationException("Unsupported data table version: " + version);
-    }
+    super(byteBuffer);
   }
 
-  public static DataTable getDataTable(byte[] bytes)
-      throws IOException {
-    return getDataTable(ByteBuffer.wrap(bytes));
+  public DataTableImplV4(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
+      byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+    super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes);
+  }
+
+  @Override
+  protected int getDataBlockVersionType() {
+    return DataTableBuilder.VERSION_4;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
index 684627f8f4..1430869f95 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
@@ -56,7 +56,7 @@ public class DataTableUtils {
    * @param columnOffsets array of column offsets.
    * @return row size in bytes.
    */
-  public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets) {
+  public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets, int dataTableVersion) {
     int numColumns = columnOffsets.length;
     assert numColumns == dataSchema.size();
 
@@ -71,10 +71,12 @@ public class DataTableUtils {
         case LONG:
           rowSizeInBytes += 8;
           break;
-        // TODO: fix float size (should be 4).
-        // For backward compatible, DON'T CHANGE.
         case FLOAT:
-          rowSizeInBytes += 8;
+          if (dataTableVersion >= 4) {
+            rowSizeInBytes += 4;
+          } else {
+            rowSizeInBytes += 8;
+          }
           break;
         case DOUBLE:
           rowSizeInBytes += 8;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
similarity index 57%
rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index bf33160b92..4ede7f2cd7 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -16,19 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 
 public class DataBlockTest {
-  private static final int TEST_ROW_COUNT = 2;
+  private static final List<DataSchema.ColumnDataType> EXCLUDE_DATA_TYPES = ImmutableList.of(
+      DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.BYTES_ARRAY);
+  private static final int TEST_ROW_COUNT = 5;
 
   @Test
   public void testException()
@@ -51,9 +59,46 @@ public class DataBlockTest {
         originalException.getMessage());
   }
 
+  /**
+   * This test is only here to ensure that {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl}
+   * producing data table can actually be wrapped and sent via mailbox in the {@link RowDataBlock} format.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testRowDataBlockCompatibleWithDataTableV4()
+      throws Exception {
+    DataSchema.ColumnDataType[] allDataTypes = DataSchema.ColumnDataType.values();
+    List<DataSchema.ColumnDataType> columnDataTypes = new ArrayList<DataSchema.ColumnDataType>();
+    List<String> columnNames = new ArrayList<String>();
+    for (int i = 0; i < allDataTypes.length; i++) {
+      if (!EXCLUDE_DATA_TYPES.contains(allDataTypes[i])) {
+        columnNames.add(allDataTypes[i].name());
+        columnDataTypes.add(allDataTypes[i]);
+      }
+    }
+
+    DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
+        columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
+    List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
+    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+    DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema);
+    DataTable dataBlockFromDataTable = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
+
+    for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
+      Object[] rowFromDataTable = SelectionOperatorUtils.extractRowFromDataTable(dataTableImpl, rowId);
+      Object[] rowFromBlock = SelectionOperatorUtils.extractRowFromDataTable(dataBlockFromDataTable, rowId);
+      for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
+        Assert.assertEquals(rowFromBlock[colId], rowFromDataTable[colId], "Error comparing Row/Column Block "
+            + " at (" + rowId + "," + colId + ") of Type: " + dataSchema.getColumnDataType(colId) + "! "
+            + " from DataBlock: [" + rowFromBlock[rowId] + "], from DataTable: [" + rowFromDataTable[colId] + "]");
+      }
+    }
+  }
+
   @Test
   public void testAllDataTypes()
-      throws IOException {
+      throws Exception {
     DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
     int numColumns = columnDataTypes.length;
     String[] columnNames = new String[numColumns];
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
similarity index 99%
rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java
rename to pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index caa9b55ccf..54f046d13f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index be2df6f749..8dbc3910f6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -163,6 +163,139 @@ public class DataTableSerDeTest {
     verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
   }
 
+  @Test
+  public void testV3V4Compatibility()
+      throws IOException {
+    DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
+    int numColumns = columnDataTypes.length;
+    String[] columnNames = new String[numColumns];
+    for (int i = 0; i < numColumns; i++) {
+      columnNames[i] = columnDataTypes[i].name();
+    }
+
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+    // TODO: verify data table compatibility across multi-stage and normal query engine.
+    // TODO: see https://github.com/apache/pinot/pull/8874/files#r894806085
+
+    // Verify V4 broker can deserialize data table (has data, but has no metadata) send by V3 server
+    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
+    DataTableBuilder dataTableBuilderV3WithDataOnly = new DataTableBuilder(dataSchema);
+    fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns);
+
+    DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table
+    DataTable newDataTable =
+        DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+    Assert.assertEquals(newDataTable.getMetadata().size(), 0);
+
+    // Verify V4 broker can deserialize data table (has data and metadata) send by V3 server
+    for (String key : EXPECTED_METADATA.keySet()) {
+      dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
+    }
+    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+    // Verify V4 broker can deserialize data table (only has metadata) send by V3 server
+    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new DataTableBuilder(dataSchema);
+    dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table
+    for (String key : EXPECTED_METADATA.keySet()) {
+      dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
+    }
+    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+    // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement
+    // disabled)
+    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+    DataTableBuilder dataTableBuilderV4WithDataOnly = new DataTableBuilder(dataSchema);
+    fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, columnDataTypes, numColumns);
+    DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V3 data table
+    // Deserialize data table bytes as V4
+    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+    Assert.assertEquals(newDataTable.getMetadata().size(), 0);
+
+    // Verify V4 broker can deserialize data table (has data and metadata) send by V4 server(with
+    // ThreadCpuTimeMeasurement disabled)
+    for (String key : EXPECTED_METADATA.keySet()) {
+      dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
+    }
+    // Deserialize data table bytes as V4
+    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+    // Verify V4 broker can deserialize data table (only has metadata) send by V4 server(with
+    // ThreadCpuTimeMeasurement disabled)
+    DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = new DataTableBuilder(dataSchema);
+    dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a V4 data table
+    for (String key : EXPECTED_METADATA.keySet()) {
+      dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
+    }
+    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+    // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement
+    // enabled)
+    ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
+    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+    dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table
+    // Deserialize data table bytes as V4
+    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+    Assert.assertEquals(newDataTable.getMetadata().size(), 1);
+    Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
+
+    // Verify V4 broker can deserialize data table (has data and metadata) send by V4 server(with
+    // ThreadCpuTimeMeasurement enabled)
+    for (String key : EXPECTED_METADATA.keySet()) {
+      dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
+    }
+    // Deserialize data table bytes as V3
+    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+      Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
+      newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+    }
+    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+    // Verify V4 broker can deserialize data table (only has metadata) send by V4 server(with
+    // ThreadCpuTimeMeasurement enabled)
+    dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a V4 data table
+    for (String key : EXPECTED_METADATA.keySet()) {
+      dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
+    }
+    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
+    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+      Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
+      newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+    }
+    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+  }
+
   @Test
   public void testV2V3Compatibility()
       throws IOException {
@@ -176,6 +309,7 @@ public class DataTableSerDeTest {
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
 
     // Verify V3 broker can deserialize data table (has data, but has no metadata) send by V2 server
+    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
     DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_2);
     DataTableBuilder dataTableBuilderV2WithDataOnly = new DataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly, columnDataTypes, numColumns);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
new file mode 100644
index 0000000000..8bbf28e1ee
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest {
+  private static final String SCHEMA_FILE_NAME =
+      "On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
+
+  @Override
+  protected String getSchemaFileName() {
+    return SCHEMA_FILE_NAME;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+
+    // Setting data table version to 4
+    DataTableBuilder.setCurrentDataTableVersion(4);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
+    brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8421);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
+    serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 8842);
+    serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422);
+  }
+
+  @Test(dataProvider = "multiStageQueryEngineSqlTestSet")
+  public void testMultiStageQuery(String sql, int expectedNumOfRows, int expectedNumOfColumns)
+      throws IOException {
+    JsonNode multiStageResponse = JsonUtils.stringToJsonNode(
+        sendPostRequest(_brokerBaseApiUrl + "/query/sql", "{\"useMultistageEngine\": true, \"sql\":\"" + sql + "\"}"));
+    Assert.assertTrue(multiStageResponse.has("resultTable"));
+    ArrayNode jsonNode = (ArrayNode) multiStageResponse.get("resultTable").get("rows");
+    // TODO: assert actual result data payload.
+    Assert.assertEquals(jsonNode.size(), expectedNumOfRows);
+    Assert.assertEquals(jsonNode.get(0).size(), expectedNumOfColumns);
+  }
+
+  @DataProvider
+  public Object[][] multiStageQueryEngineSqlTestSet() {
+    return new Object[][] {
+        new Object[]{"SELECT * FROM mytable_OFFLINE", 10, 73},
+        new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE WHERE CarrierDelay=15 AND ArrDelay>20", 10, 2},
+        new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.AirlineID = b.AirlineID "
+            + " WHERE a.CarrierDelay=15 AND a.ArrDelay>20 AND b.ArrDelay<20", 10, 146}
+    };
+  }
+
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    // Setting data table version to 4
+    DataTableBuilder.setCurrentDataTableVersion(3);
+
+    dropOfflineTable(DEFAULT_TABLE_NAME);
+
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index e25c6e03b8..cf56c5b786 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -34,8 +36,6 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
 import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -101,7 +101,8 @@ public class QueryRunner {
       BaseDataBlock dataBlock;
       try {
         DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null);
-        // this works because default DataTableImplV3 will have ordinal 0, which maps to ROW(0)
+        // this works because default DataTableImplV3 will have a version number at beginning,
+        // which maps to ROW type of version 3.
         dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
       } catch (IOException e) {
         throw new RuntimeException("Unable to convert byte buffer", e);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 9284011f06..ae64cc90bb 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -24,6 +24,9 @@ import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
 import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.ColumnarDataBlock;
+import org.apache.pinot.core.common.datablock.RowDataBlock;
 
 
 /**
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
new file mode 100644
index 0000000000..f2992c41df
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
+
+
+public final class TransferableBlockUtils {
+  private TransferableBlockUtils() {
+    // do not instantiate.
+  }
+  private static final TransferableBlock EOS_TRANSFERABLE_BLOCK =
+      new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+
+  public static TransferableBlock getEndOfStreamTransferableBlock() {
+    return EOS_TRANSFERABLE_BLOCK;
+  }
+
+  public static TransferableBlock getErrorTransferableBlock(Exception e) {
+    return new TransferableBlock(DataBlockUtils.getErrorDataBlock(e));
+  }
+
+  public static TransferableBlock getEmptyTransferableBlock(DataSchema dataSchema) {
+    return new TransferableBlock(DataBlockUtils.getEmptyDataBlock(dataSchema));
+  }
+
+  public static boolean isEndOfStream(TransferableBlock transferableBlock) {
+    return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA)
+        && "END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata()
+        .get(DataTable.MetadataKey.TABLE.getName()));
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index d8ec43e040..01a97643ab 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -36,8 +36,8 @@ import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
 import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -88,7 +88,7 @@ public class WorkerQueryExecutor {
       @Override
       public void runJob() {
         ThreadTimer executionThreadTimer = new ThreadTimer();
-        while (!DataBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
+        while (!TransferableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
           LOGGER.debug("Result Block acquired");
         }
         LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ff99dae5ed..ae1bf43d4c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -25,14 +25,15 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockBuilder;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 
 
 /**
@@ -86,14 +87,14 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
     try {
       return new TransferableBlock(buildJoinedDataBlock(_leftTableOperator.nextBlock()));
     } catch (Exception e) {
-      return DataBlockUtils.getErrorTransferableBlock(e);
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
     }
   }
 
   private void buildBroadcastHashTable() {
     if (!_isHashTableBuilt) {
       TransferableBlock rightBlock = _rightTableOperator.nextBlock();
-      while (!DataBlockUtils.isEndOfStream(rightBlock)) {
+      while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
         BaseDataBlock dataBlock = rightBlock.getDataBlock();
         _rightTableSchema = dataBlock.getDataSchema();
         int numRows = dataBlock.getNumberOfRows();
@@ -112,7 +113,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
 
   private BaseDataBlock buildJoinedDataBlock(TransferableBlock block)
       throws Exception {
-    if (DataBlockUtils.isEndOfStream(block)) {
+    if (TransferableBlockUtils.isEndOfStream(block)) {
       return DataBlockUtils.getEndOfStreamDataBlock();
     }
     List<Object[]> rows = new ArrayList<>();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 37a203f815..0b8d88cd00 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -25,14 +25,15 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,7 +117,7 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
     }
     // TODO: we need to at least return one data table with schema if there's no error.
     // we need to condition this on whether there's already things being returned or not.
-    return DataBlockUtils.getEndOfStreamTransferableBlock();
+    return TransferableBlockUtils.getEndOfStreamTransferableBlock();
   }
 
   public RelDistribution.Type getExchangeType() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 3681acdc7d..8b5ccd5b99 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -30,6 +30,9 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -37,10 +40,8 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockBuilder;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,7 +123,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
     if (_dataTableBlockBaseOperator != null) {
       transferableBlock = _dataTableBlockBaseOperator.nextBlock();
       dataTable = transferableBlock.getDataBlock();
-      isEndOfStream = DataBlockUtils.isEndOfStream(transferableBlock);
+      isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
     } else {
       dataTable = _dataTable;
       isEndOfStream = true;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 260c083ea7..f0236eb7ff 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -29,14 +29,14 @@ import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
@@ -117,7 +117,7 @@ public class QueryDispatcher {
     TransferableBlock transferableBlock;
     while (true) {
       transferableBlock = mailboxReceiveOperator.nextBlock();
-      if (DataBlockUtils.isEndOfStream(transferableBlock)) {
+      if (TransferableBlockUtils.isEndOfStream(transferableBlock)) {
         break;
       }
       if (transferableBlock.getDataBlock() != null) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index e254778db1..07c69a05ac 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -24,7 +24,7 @@ import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.pinot.common.proto.Mailbox;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index dd405c898d..8bef837793 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -180,8 +180,13 @@ public abstract class BaseServerStarter implements ServiceStartable {
             Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
 
     // Set data table version send to broker.
-    DataTableBuilder.setCurrentDataTableVersion(_serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION,
-        Server.DEFAULT_CURRENT_DATA_TABLE_VERSION));
+    int dataTableVersion =
+        _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
+    if (dataTableVersion > Server.DEFAULT_CURRENT_DATA_TABLE_VERSION) {
+      throw new UnsupportedOperationException("Setting experimental DataTable version newer than default via config "
+          + "is not allowed. Current default DataTable version: " + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
+    }
+    DataTableBuilder.setCurrentDataTableVersion(dataTableVersion);
 
     LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName: {}, instanceId: {}", _zkAddress,
         _helixClusterName, _instanceId);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org