You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/10/02 09:20:11 UTC

[pinot] branch master updated: Refactor DataTable to pinot-common (#9503)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f2547195c Refactor DataTable to pinot-common (#9503)
3f2547195c is described below

commit 3f2547195c36937cffd4ed8332d2e691b008ab2c
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Sun Oct 2 02:20:03 2022 -0700

    Refactor DataTable to pinot-common (#9503)
---
 .../SingleConnectionBrokerRequestHandler.java      |  2 +-
 .../pinot}/common/datablock/BaseDataBlock.java     | 19 +++--
 .../pinot}/common/datablock/ColumnarDataBlock.java |  2 +-
 .../pinot}/common/datablock/DataBlockUtils.java    |  9 +-
 .../pinot}/common/datablock/MetadataBlock.java     |  2 +-
 .../pinot}/common/datablock/RowDataBlock.java      |  2 +-
 .../pinot}/common/datatable/BaseDataTable.java     |  6 +-
 .../common/{utils => datatable}/DataTable.java     |  5 +-
 .../pinot/common/datatable/DataTableFactory.java   | 45 +++++-----
 .../pinot}/common/datatable/DataTableImplV2.java   | 18 ++--
 .../pinot}/common/datatable/DataTableImplV3.java   |  4 +-
 .../pinot}/common/datatable/DataTableImplV4.java   |  4 +-
 .../pinot/common/datatable/DataTableUtils.java     | 93 +++++++++++++++++++++
 .../pinot/common}/request/context/ThreadTimer.java |  2 +-
 .../org/apache/pinot/common/utils/DataSchema.java  |  1 +
 .../pinot/common/utils/RoaringBitmapUtils.java     | 41 +++++-----
 .../connector/PinotGrpcServerDataFetcher.scala     |  3 +-
 .../spark/connector/PinotServerDataFetcher.scala   |  4 +-
 .../connector/spark/connector/PinotUtils.scala     |  2 +-
 .../connector/spark/connector/PinotUtilsTest.scala |  6 +-
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  4 +-
 .../core/common/datablock/DataBlockBuilder.java    | 10 ++-
 .../common/datatable/BaseDataTableBuilder.java     |  4 +-
 .../core/common/datatable/DataTableBuilder.java    |  2 +-
 ...leFactory.java => DataTableBuilderFactory.java} | 61 +++-----------
 ...aTableUtils.java => DataTableBuilderUtils.java} | 95 ++++++----------------
 .../common/datatable/DataTableBuilderV2V3.java     |  5 +-
 .../core/common/datatable/DataTableBuilderV4.java  |  8 +-
 .../core/operator/InstanceResponseOperator.java    |  6 +-
 .../StreamingInstanceResponseOperator.java         |  6 +-
 .../operator/blocks/InstanceResponseBlock.java     |  2 +-
 .../blocks/results/AggregationResultsBlock.java    |  6 +-
 .../operator/blocks/results/BaseResultsBlock.java  |  4 +-
 .../blocks/results/DistinctResultsBlock.java       |  6 +-
 .../blocks/results/ExceptionResultsBlock.java      |  6 +-
 .../blocks/results/GroupByResultsBlock.java        |  8 +-
 .../blocks/results/MetadataResultsBlock.java       |  6 +-
 .../blocks/results/SelectionResultsBlock.java      |  2 +-
 .../core/operator/combine/BaseCombineOperator.java |  2 +-
 .../operator/streaming/StreamingResponseUtils.java |  2 +-
 .../StreamingSelectionOnlyCombineOperator.java     |  2 +-
 .../apache/pinot/core/plan/GlobalPlanImplV0.java   |  2 +-
 .../main/java/org/apache/pinot/core/plan/Plan.java |  2 +-
 .../function/AggregationFunctionUtils.java         |  2 +-
 .../DistinctCountBitmapAggregationFunction.java    | 12 +--
 .../pinot/core/query/distinct/DistinctTable.java   |  7 +-
 .../pinot/core/query/executor/QueryExecutor.java   |  2 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  | 20 ++---
 .../query/reduce/AggregationDataTableReducer.java  |  2 +-
 .../pinot/core/query/reduce/BaseReduceService.java |  4 +-
 .../core/query/reduce/BrokerReduceService.java     |  2 +-
 .../pinot/core/query/reduce/DataTableReducer.java  |  2 +-
 .../query/reduce/DistinctDataTableReducer.java     |  2 +-
 .../query/reduce/ExplainPlanDataTableReducer.java  |  2 +-
 .../core/query/reduce/GroupByDataTableReducer.java |  2 +-
 .../query/reduce/SelectionDataTableReducer.java    |  2 +-
 .../reduce/SelectionOnlyStreamingReducer.java      |  2 +-
 .../core/query/reduce/StreamingReduceService.java  |  4 +-
 .../pinot/core/query/reduce/StreamingReducer.java  |  2 +-
 .../core/query/request/ServerQueryRequest.java     |  5 +-
 .../pinot/core/query/scheduler/QueryScheduler.java | 24 +++---
 .../query/selection/SelectionOperatorService.java  |  2 +-
 .../query/selection/SelectionOperatorUtils.java    |  6 +-
 .../pinot/core/transport/AsyncQueryResponse.java   |  2 +-
 .../pinot/core/transport/DataTableHandler.java     |  4 +-
 .../core/transport/InstanceRequestHandler.java     | 15 ++--
 .../apache/pinot/core/transport/QueryRouter.java   |  4 +-
 .../pinot/core/transport/ServerResponse.java       |  2 +-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  2 +-
 .../pinot/core/common/datablock/DataBlockTest.java | 11 ++-
 .../core/common/datablock/DataBlockTestUtils.java  |  1 +
 ...ilsTest.java => DataTableBuilderUtilsTest.java} | 12 +--
 .../core/common/datatable/DataTableSerDeTest.java  | 49 +++++------
 .../executor/QueryExecutorExceptionsTest.java      |  2 +-
 .../core/query/executor/QueryExecutorTest.java     |  2 +-
 .../core/query/reduce/BrokerReduceServiceTest.java |  6 +-
 .../query/scheduler/PrioritySchedulerTest.java     |  9 +-
 .../selection/SelectionOperatorServiceTest.java    |  2 +-
 .../pinot/core/transport/QueryRoutingTest.java     | 12 +--
 .../apache/pinot/queries/AllNullQueriesTest.java   |  7 +-
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  4 +-
 .../pinot/queries/BigDecimalQueriesTest.java       |  7 +-
 .../queries/BooleanNullEnabledQueriesTest.java     |  7 +-
 .../queries/DistinctCountBitmapQueriesTest.java    |  4 +-
 .../pinot/queries/ExplainPlanQueriesTest.java      |  4 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java |  2 +-
 .../pinot/queries/NullEnabledQueriesTest.java      |  7 +-
 .../queries/SegmentWithNullValueVectorTest.java    |  2 +-
 .../tests/MultiStageEngineIntegrationTest.java     |  8 +-
 .../tests/OfflineGRPCServerIntegrationTest.java    |  6 +-
 ...PartitionLLCRealtimeClusterIntegrationTest.java |  2 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  8 +-
 .../query/runtime/blocks/TransferableBlock.java    |  8 +-
 .../runtime/blocks/TransferableBlockUtils.java     |  2 +-
 .../runtime/executor/WorkerQueryExecutor.java      |  2 +-
 .../query/runtime/operator/AggregateOperator.java  |  2 +-
 .../query/runtime/operator/FilterOperator.java     |  4 +-
 .../query/runtime/operator/HashJoinOperator.java   |  4 +-
 .../runtime/operator/LiteralValueOperator.java     |  2 +-
 .../runtime/operator/MailboxReceiveOperator.java   |  6 +-
 .../runtime/operator/MailboxSendOperator.java      |  4 +-
 .../pinot/query/runtime/operator/SortOperator.java |  2 +-
 .../query/runtime/operator/TransformOperator.java  |  4 +-
 .../pinot/query/service/QueryDispatcher.java       |  6 +-
 .../query/mailbox/GrpcMailboxServiceTest.java      |  6 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  7 +-
 .../DistinctCountBitmapValueAggregator.java        |  6 +-
 .../server/starter/helix/BaseServerStarter.java    | 12 +--
 .../apache/pinot/spi/utils/CommonConstants.java    |  1 -
 109 files changed, 470 insertions(+), 438 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 19c081c016..c34b8dda6a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -33,6 +33,7 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.config.NettyConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -40,7 +41,6 @@ import org.apache.pinot.common.metrics.BrokerQueryPhase;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.transport.AsyncQueryResponse;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
similarity index 97%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
index aa55d4cfb7..8584491161 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.common.datablock;
+package org.apache.pinot.common.datablock;
 
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
@@ -28,13 +28,14 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableImplV3;
+import org.apache.pinot.common.datatable.DataTableUtils;
+import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
-import org.apache.pinot.core.common.datatable.DataTableUtils;
-import org.apache.pinot.core.query.request.context.ThreadTimer;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
@@ -43,7 +44,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 /**
- * Base data block mostly replicating implementation of {@link org.apache.pinot.core.common.datatable.DataTableImplV3}.
+ * Base data block mostly replicating implementation of {@link DataTableImplV3}.
  *
  * +-----------------------------------------------+
  * | 13 integers of header:                        |
@@ -353,7 +354,7 @@ public abstract class BaseDataBlock implements DataTable {
     int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
     int type = _variableSizeData.getInt();
     if (size == 0) {
-      assert type == ObjectSerDeUtils.NULL_TYPE_VALUE;
+      assert type == CustomObject.NULL_TYPE_VALUE;
       return null;
     }
     ByteBuffer buffer = _variableSizeData.slice();
@@ -376,7 +377,7 @@ public abstract class BaseDataBlock implements DataTable {
       _variableSizeData.position(offset);
       byte[] nullBitmapBytes = new byte[bytesLength];
       _variableSizeData.get(nullBitmapBytes);
-      return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
+      return RoaringBitmapUtils.deserialize(nullBitmapBytes);
     } else {
       return null;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/ColumnarDataBlock.java
similarity index 98%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datablock/ColumnarDataBlock.java
index cdf8ed0849..216f4d9d91 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/ColumnarDataBlock.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.common.datablock;
+package org.apache.pinot.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
similarity index 96%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index da0e857017..ed29c3b46d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.common.datablock;
+package org.apache.pinot.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nonnull;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
@@ -32,11 +31,11 @@ import org.roaringbitmap.RoaringBitmap;
 
 
 public final class DataBlockUtils {
-  protected static final int VERSION_TYPE_SHIFT = 5;
   private DataBlockUtils() {
-    // do not instantiate.
   }
 
+  static final int VERSION_TYPE_SHIFT = 5;
+
   public static MetadataBlock getErrorDataBlock(Exception e) {
     String errorMessage = e.getMessage() == null ? e.toString() : e.getMessage();
     if (e instanceof ProcessingException) {
@@ -54,7 +53,7 @@ public final class DataBlockUtils {
     return errorBlock;
   }
 
-  public static MetadataBlock getEndOfStreamDataBlock(@Nonnull DataSchema dataSchema) {
+  public static MetadataBlock getEndOfStreamDataBlock(DataSchema dataSchema) {
     // TODO: add query statistics metadata for the block.
     return new MetadataBlock(dataSchema);
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
similarity index 97%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
index d469ec0287..8b063e4024 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.common.datablock;
+package org.apache.pinot.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/RowDataBlock.java
similarity index 98%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datablock/RowDataBlock.java
index 5bdf121fad..8e3f720867 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/RowDataBlock.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.common.datablock;
+package org.apache.pinot.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
similarity index 98%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
index 6b5c6d4564..ae6d7f03a5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/BaseDataTable.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.common.datatable;
+package org.apache.pinot.common.datatable;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -28,8 +28,6 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
@@ -249,7 +247,7 @@ public abstract class BaseDataTable implements DataTable {
     int size = positionCursorInVariableBuffer(rowId, colId);
     int type = _variableSizeData.getInt();
     if (size == 0) {
-      assert type == ObjectSerDeUtils.NULL_TYPE_VALUE;
+      assert type == CustomObject.NULL_TYPE_VALUE;
       return null;
     }
     ByteBuffer buffer = _variableSizeData.slice();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
similarity index 98%
rename from pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 1d86715bcf..5a5f323c93 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.utils;
+package org.apache.pinot.common.datatable;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 
@@ -89,6 +90,8 @@ public interface DataTable {
   DataTable toDataOnlyDataTable();
 
   class CustomObject {
+    public static final int NULL_TYPE_VALUE = 100;
+
     private final int _type;
     private final ByteBuffer _buffer;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
similarity index 52%
copy from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
copy to pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
index ba65453ec9..55374d7cca 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
@@ -16,38 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.pinot.core.common.datatable;
+package org.apache.pinot.common.datatable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.datablock.RowDataBlock;
-import org.apache.pinot.spi.annotations.InterfaceStability;
-
 
-/**
- * Datatable V4 Implementation is a wrapper around the Row-based data block.
- */
-@InterfaceStability.Evolving
-public class DataTableImplV4 extends RowDataBlock {
 
-  public DataTableImplV4() {
-    super();
+public class DataTableFactory {
+  private DataTableFactory() {
   }
 
-  public DataTableImplV4(ByteBuffer byteBuffer)
-      throws IOException {
-    super(byteBuffer);
-  }
+  public static final int VERSION_2 = 2;
+  public static final int VERSION_3 = 3;
+  public static final int VERSION_4 = 4;
 
-  public DataTableImplV4(int numRows, DataSchema dataSchema, String[] dictionary, byte[] fixedSizeDataBytes,
-      byte[] variableSizeDataBytes) {
-    super(numRows, dataSchema, dictionary, fixedSizeDataBytes, variableSizeDataBytes);
+  public static DataTable getDataTable(ByteBuffer byteBuffer)
+      throws IOException {
+    int version = byteBuffer.getInt();
+    switch (version) {
+      case VERSION_2:
+        return new DataTableImplV2(byteBuffer);
+      case VERSION_3:
+        return new DataTableImplV3(byteBuffer);
+      case VERSION_4:
+        return new DataTableImplV4(byteBuffer);
+      default:
+        throw new IllegalStateException("Unsupported data table version: " + version);
+    }
   }
 
-  @Override
-  protected int getDataBlockVersionType() {
-    return DataTableFactory.VERSION_4;
+  public static DataTable getDataTable(byte[] bytes)
+      throws IOException {
+    return getDataTable(ByteBuffer.wrap(bytes));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
similarity index 95%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
index 252d7303b2..c517502ebc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.common.datatable;
+package org.apache.pinot.common.datatable;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -129,16 +129,16 @@ public class DataTableImplV2 extends BaseDataTable {
 
   private Map<String, String> deserializeMetadata(ByteBuffer buffer)
       throws IOException {
-      int numEntries = buffer.getInt();
-      Map<String, String> metadata = new HashMap<>(numEntries);
+    int numEntries = buffer.getInt();
+    Map<String, String> metadata = new HashMap<>(numEntries);
 
-      for (int i = 0; i < numEntries; i++) {
-        String key = DataTableUtils.decodeString(buffer);
-        String value = DataTableUtils.decodeString(buffer);
-        metadata.put(key, value);
-      }
+    for (int i = 0; i < numEntries; i++) {
+      String key = DataTableUtils.decodeString(buffer);
+      String value = DataTableUtils.decodeString(buffer);
+      metadata.put(key, value);
+    }
 
-      return metadata;
+    return metadata;
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
similarity index 99%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
rename to pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
index 0e214f3a47..07e90f1820 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.pinot.core.common.datatable;
+package org.apache.pinot.common.datatable;
 
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
@@ -27,9 +27,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.query.request.context.ThreadTimer;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
similarity index 93%
copy from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
copy to pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index ba65453ec9..5f1c6858f7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.pinot.core.common.datatable;
+package org.apache.pinot.common.datatable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.pinot.common.datablock.RowDataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.datablock.RowDataBlock;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
new file mode 100644
index 0000000000..a1ddd8d104
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datatable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.DataSchema;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+public class DataTableUtils {
+  private DataTableUtils() {
+  }
+
+  /**
+   * Given a {@link DataSchema}, compute each column's offset and fill them into the passed in array, then return the
+   * row size in bytes.
+   *
+   * @param dataSchema data schema.
+   * @param columnOffsets array of column offsets.
+   * @return row size in bytes.
+   */
+  public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets, int dataTableVersion) {
+    int numColumns = columnOffsets.length;
+    assert numColumns == dataSchema.size();
+
+    DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+    int rowSizeInBytes = 0;
+    for (int i = 0; i < numColumns; i++) {
+      columnOffsets[i] = rowSizeInBytes;
+      switch (storedColumnDataTypes[i]) {
+        case INT:
+          rowSizeInBytes += 4;
+          break;
+        case LONG:
+          rowSizeInBytes += 8;
+          break;
+        case FLOAT:
+          if (dataTableVersion >= DataTableFactory.VERSION_4) {
+            rowSizeInBytes += 4;
+          } else {
+            rowSizeInBytes += 8;
+          }
+          break;
+        case DOUBLE:
+          rowSizeInBytes += 8;
+          break;
+        case STRING:
+          rowSizeInBytes += 4;
+          break;
+        // Object and array. (POSITION|LENGTH)
+        default:
+          rowSizeInBytes += 8;
+          break;
+      }
+    }
+
+    return rowSizeInBytes;
+  }
+
+  /**
+   * Helper method to decode string.
+   */
+  public static String decodeString(ByteBuffer buffer)
+      throws IOException {
+    int length = buffer.getInt();
+    if (length == 0) {
+      return StringUtils.EMPTY;
+    } else {
+      byte[] bytes = new byte[length];
+      buffer.get(bytes);
+      return new String(bytes, UTF_8);
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/ThreadTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/request/context/ThreadTimer.java
similarity index 97%
rename from pinot-core/src/main/java/org/apache/pinot/core/query/request/context/ThreadTimer.java
rename to pinot-common/src/main/java/org/apache/pinot/common/request/context/ThreadTimer.java
index 42d9557a1c..48736ac00e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/ThreadTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/request/context/ThreadTimer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.request.context;
+package org.apache.pinot.common.request.context;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 4370e7623a..dbb56afa0d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.EnumSet;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
similarity index 51%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
rename to pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
index ba65453ec9..861ac5c114 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoaringBitmapUtils.java
@@ -16,38 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.pinot.core.common.datatable;
+package org.apache.pinot.common.utils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.datablock.RowDataBlock;
-import org.apache.pinot.spi.annotations.InterfaceStability;
-
+import org.roaringbitmap.RoaringBitmap;
 
-/**
- * Datatable V4 Implementation is a wrapper around the Row-based data block.
- */
-@InterfaceStability.Evolving
-public class DataTableImplV4 extends RowDataBlock {
 
-  public DataTableImplV4() {
-    super();
+public class RoaringBitmapUtils {
+  private RoaringBitmapUtils() {
   }
 
-  public DataTableImplV4(ByteBuffer byteBuffer)
-      throws IOException {
-    super(byteBuffer);
+  public static byte[] serialize(RoaringBitmap bitmap) {
+    byte[] bytes = new byte[bitmap.serializedSizeInBytes()];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    bitmap.serialize(byteBuffer);
+    return bytes;
   }
 
-  public DataTableImplV4(int numRows, DataSchema dataSchema, String[] dictionary, byte[] fixedSizeDataBytes,
-      byte[] variableSizeDataBytes) {
-    super(numRows, dataSchema, dictionary, fixedSizeDataBytes, variableSizeDataBytes);
+  public static RoaringBitmap deserialize(byte[] bytes) {
+    return deserialize(ByteBuffer.wrap(bytes));
   }
 
-  @Override
-  protected int getDataBlockVersionType() {
-    return DataTableFactory.VERSION_4;
+  public static RoaringBitmap deserialize(ByteBuffer byteBuffer) {
+    RoaringBitmap bitmap = new RoaringBitmap();
+    try {
+      bitmap.deserialize(byteBuffer);
+    } catch (IOException e) {
+      throw new RuntimeException("Caught exception while deserializing RoaringBitmap", e);
+    }
+    return bitmap;
   }
 }
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
index cd881527be..78cb960998 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
@@ -19,12 +19,11 @@
 package org.apache.pinot.connector.spark.connector
 
 import io.grpc.ManagedChannelBuilder
+import org.apache.pinot.common.datatable.{DataTable, DataTableFactory}
 import org.apache.pinot.common.proto.PinotQueryServerGrpc
 import org.apache.pinot.common.proto.Server.ServerRequest
-import org.apache.pinot.common.utils.DataTable
 import org.apache.pinot.connector.spark.exceptions.PinotException
 import org.apache.pinot.connector.spark.utils.Logging
-import org.apache.pinot.core.common.datatable.DataTableFactory
 import org.apache.pinot.spi.config.table.TableType
 
 import scala.collection.JavaConverters._
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
index d8d0d1bd7f..81bb405d5e 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
@@ -18,11 +18,10 @@
  */
 package org.apache.pinot.connector.spark.connector
 
-import java.util.{List => JList, Map => JMap}
 import org.apache.helix.model.InstanceConfig
+import org.apache.pinot.common.datatable.DataTable
 import org.apache.pinot.common.metrics.BrokerMetrics
 import org.apache.pinot.common.request.BrokerRequest
-import org.apache.pinot.common.utils.DataTable
 import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
 import org.apache.pinot.connector.spark.exceptions.PinotException
 import org.apache.pinot.connector.spark.utils.Logging
@@ -33,6 +32,7 @@ import org.apache.pinot.spi.env.PinotConfiguration
 import org.apache.pinot.spi.metrics.PinotMetricUtils
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler
 
+import java.util.{List => JList, Map => JMap}
 import scala.collection.JavaConverters._
 
 /**
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotUtils.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotUtils.scala
index bac30cae92..b7a61803c5 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotUtils.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotUtils.scala
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.connector.spark.connector
 
+import org.apache.pinot.common.datatable.DataTable
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType
-import org.apache.pinot.common.utils.DataTable
 import org.apache.pinot.connector.spark.exceptions.PinotException
 import org.apache.pinot.spi.data.{FieldSpec, Schema}
 import org.apache.spark.sql.catalyst.InternalRow
diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
index 4929ebc122..673e7c982e 100644
--- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
+++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
@@ -23,7 +23,7 @@ import org.apache.pinot.common.utils.DataSchema.ColumnDataType
 import org.apache.pinot.connector.spark.BaseTest
 import org.apache.pinot.connector.spark.connector.PinotUtils._
 import org.apache.pinot.connector.spark.exceptions.PinotException
-import org.apache.pinot.core.common.datatable.DataTableFactory
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory
 import org.apache.pinot.spi.data.Schema
 import org.apache.pinot.spi.utils.ByteArray
 import org.apache.spark.sql.catalyst.util.ArrayData
@@ -74,7 +74,7 @@ class PinotUtilsTest extends BaseTest {
     )
     val dataSchema = new DataSchema(columnNames, columnTypes)
 
-    val dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema)
+    val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
     dataTableBuilder.startRow()
     dataTableBuilder.setColumn(0, "strValueDim")
     dataTableBuilder.setColumn(1, 5)
@@ -140,7 +140,7 @@ class PinotUtilsTest extends BaseTest {
     val columnTypes = Array(ColumnDataType.STRING, ColumnDataType.INT)
     val dataSchema = new DataSchema(columnNames, columnTypes)
 
-    val dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema)
+    val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
     dataTableBuilder.startRow()
     dataTableBuilder.setColumn(0, "strValueDim")
     dataTableBuilder.setColumn(1, 5)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index b0781bf88e..6256020f4a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -57,7 +57,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
@@ -87,8 +87,6 @@ public class ObjectSerDeUtils {
   private ObjectSerDeUtils() {
   }
 
-  public static final int NULL_TYPE_VALUE = 100;
-
   public enum ObjectType {
     // NOTE: DO NOT change the value, we rely on the value to indicate the object type
     String(0),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 5c79e291f6..72541bd02d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -27,7 +27,13 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.ColumnarDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.RowDataBlock;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -81,7 +87,7 @@ public class DataBlockBuilder {
     if (nullRowIds == null || nullRowIds.isEmpty()) {
       _fixedSizeDataOutputStream.writeInt(0);
     } else {
-      byte[] bitmapBytes = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullRowIds);
+      byte[] bitmapBytes = RoaringBitmapUtils.serialize(nullRowIds);
       _fixedSizeDataOutputStream.writeInt(bitmapBytes.length);
       _variableSizeDataByteArrayOutputStream.write(bitmapBytes);
     }
@@ -449,7 +455,7 @@ public class DataBlockBuilder {
     byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
     if (value == null) {
       byteBuffer.putInt(0);
-      builder._variableSizeDataOutputStream.writeInt(ObjectSerDeUtils.NULL_TYPE_VALUE);
+      builder._variableSizeDataOutputStream.writeInt(DataTable.CustomObject.NULL_TYPE_VALUE);
     } else {
       int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
       byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
index 63e4623a58..9547924ac5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -101,7 +103,7 @@ public abstract class BaseDataTableBuilder implements DataTableBuilder {
     _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
     if (value == null) {
       _currentRowDataByteBuffer.putInt(0);
-      _variableSizeDataOutputStream.writeInt(ObjectSerDeUtils.NULL_TYPE_VALUE);
+      _variableSizeDataOutputStream.writeInt(DataTable.CustomObject.NULL_TYPE_VALUE);
     } else {
       int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
       byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index 08a874e484..cd4d9d3003 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.common.datatable;
 import java.io.IOException;
 import java.math.BigDecimal;
 import javax.annotation.Nullable;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.utils.ByteArray;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
similarity index 52%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
index cbe64b4aef..5d9e616974 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
@@ -18,25 +18,21 @@
  */
 package org.apache.pinot.core.common.datatable;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DataTableFactory {
-  private static final Logger LOGGER = LoggerFactory.getLogger(DataTableFactory.class);
-  public static final int VERSION_2 = 2;
-  public static final int VERSION_3 = 3;
-  public static final int VERSION_4 = 4;
-  public static final int DEFAULT_VERSION = VERSION_3;
+public class DataTableBuilderFactory {
+  private DataTableBuilderFactory() {
+  }
 
-  private static int _version = DataTableFactory.DEFAULT_VERSION;
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataTableBuilderFactory.class);
 
-  private DataTableFactory() {
-  }
+  public static final int DEFAULT_VERSION = DataTableFactory.VERSION_3;
+
+  private static int _version = DEFAULT_VERSION;
 
   public static int getDataTableVersion() {
     return _version;
@@ -51,48 +47,15 @@ public class DataTableFactory {
     _version = version;
   }
 
-  public static DataTable getEmptyDataTable() {
-    switch (_version) {
-      case VERSION_2:
-        return new DataTableImplV2();
-      case VERSION_3:
-        return new DataTableImplV3();
-      case VERSION_4:
-        return new DataTableImplV4();
-      default:
-        throw new IllegalStateException("Unexpected value: " + _version);
-    }
-  }
-
   public static DataTableBuilder getDataTableBuilder(DataSchema dataSchema) {
     switch (_version) {
-      case VERSION_2:
-      case VERSION_3:
+      case DataTableFactory.VERSION_2:
+      case DataTableFactory.VERSION_3:
         return new DataTableBuilderV2V3(dataSchema, _version);
-      case VERSION_4:
+      case DataTableFactory.VERSION_4:
         return new DataTableBuilderV4(dataSchema);
       default:
-        throw new UnsupportedOperationException("Unsupported data table version: " + _version);
+        throw new IllegalStateException("Unsupported data table version: " + _version);
     }
   }
-
-  public static DataTable getDataTable(ByteBuffer byteBuffer)
-      throws IOException {
-    int version = byteBuffer.getInt();
-    switch (version) {
-      case VERSION_2:
-        return new DataTableImplV2(byteBuffer);
-      case VERSION_3:
-        return new DataTableImplV3(byteBuffer);
-      case VERSION_4:
-        return new DataTableImplV4(byteBuffer);
-      default:
-        throw new UnsupportedOperationException("Unsupported data table version: " + version);
-    }
-  }
-
-  public static DataTable getDataTable(byte[] bytes)
-      throws IOException {
-    return getDataTable(ByteBuffer.wrap(bytes));
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
similarity index 75%
rename from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
index db7329ee71..bc631af2fe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
@@ -19,76 +19,47 @@
 package org.apache.pinot.core.common.datatable;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableImplV2;
+import org.apache.pinot.common.datatable.DataTableImplV3;
+import org.apache.pinot.common.datatable.DataTableImplV4;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 
 /**
  * The <code>DataTableUtils</code> class provides utility methods for data table.
  */
 @SuppressWarnings("rawtypes")
-public class DataTableUtils {
-  private DataTableUtils() {
+public class DataTableBuilderUtils {
+  private DataTableBuilderUtils() {
   }
 
   /**
-   * Given a {@link DataSchema}, compute each column's offset and fill them into the passed in array, then return the
-   * row size in bytes.
-   *
-   * @param dataSchema data schema.
-   * @param columnOffsets array of column offsets.
-   * @return row size in bytes.
+   * Returns an empty data table without data.
    */
-  public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets, int dataTableVersion) {
-    int numColumns = columnOffsets.length;
-    assert numColumns == dataSchema.size();
-
-    ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
-    int rowSizeInBytes = 0;
-    for (int i = 0; i < numColumns; i++) {
-      columnOffsets[i] = rowSizeInBytes;
-      switch (storedColumnDataTypes[i]) {
-        case INT:
-          rowSizeInBytes += 4;
-          break;
-        case LONG:
-          rowSizeInBytes += 8;
-          break;
-        case FLOAT:
-          if (dataTableVersion >= DataTableFactory.VERSION_4) {
-            rowSizeInBytes += 4;
-          } else {
-            rowSizeInBytes += 8;
-          }
-          break;
-        case DOUBLE:
-          rowSizeInBytes += 8;
-          break;
-        case STRING:
-          rowSizeInBytes += 4;
-          break;
-        // Object and array. (POSITION|LENGTH)
-        default:
-          rowSizeInBytes += 8;
-          break;
-      }
+  public static DataTable getEmptyDataTable() {
+    int version = DataTableBuilderFactory.getDataTableVersion();
+    switch (version) {
+      case DataTableFactory.VERSION_2:
+        return new DataTableImplV2();
+      case DataTableFactory.VERSION_3:
+        return new DataTableImplV3();
+      case DataTableFactory.VERSION_4:
+        return new DataTableImplV4();
+      default:
+        throw new IllegalStateException("Unsupported data table version: " + version);
     }
-
-    return rowSizeInBytes;
   }
 
   /**
@@ -120,7 +91,7 @@ public class DataTableUtils {
     // NOTE: Use STRING column data type as default for selection query
     Arrays.fill(columnDataTypes, ColumnDataType.STRING);
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    return DataTableFactory.getDataTableBuilder(dataSchema).build();
+    return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
   }
 
   /**
@@ -151,7 +122,7 @@ public class DataTableUtils {
         columnDataTypes[index] = aggregationFunction.getIntermediateResultColumnType();
         index++;
       }
-      return DataTableFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes)).build();
+      return DataTableBuilderFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes)).build();
     } else {
       // Aggregation only query
 
@@ -169,7 +140,7 @@ public class DataTableUtils {
 
       // Build the data table
       DataTableBuilder dataTableBuilder =
-          DataTableFactory.getDataTableBuilder(new DataSchema(aggregationColumnNames, columnDataTypes));
+          DataTableBuilderFactory.getDataTableBuilder(new DataSchema(aggregationColumnNames, columnDataTypes));
       dataTableBuilder.startRow();
       for (int i = 0; i < numAggregations; i++) {
         switch (columnDataTypes[i]) {
@@ -208,11 +179,12 @@ public class DataTableUtils {
     ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length];
     // NOTE: Use STRING column data type as default for distinct query
     Arrays.fill(columnDataTypes, ColumnDataType.STRING);
-    DistinctTable distinctTable = new DistinctTable(
-        new DataSchema(columnNames, columnDataTypes), Collections.emptySet(), queryContext.isNullHandlingEnabled());
+    DistinctTable distinctTable =
+        new DistinctTable(new DataSchema(columnNames, columnDataTypes), Collections.emptySet(),
+            queryContext.isNullHandlingEnabled());
 
     // Build the data table
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(
         new DataSchema(new String[]{distinctAggregationFunction.getColumnName()},
             new ColumnDataType[]{ColumnDataType.OBJECT}));
     dataTableBuilder.startRow();
@@ -220,19 +192,4 @@ public class DataTableUtils {
     dataTableBuilder.finishRow();
     return dataTableBuilder.build();
   }
-
-  /**
-   * Helper method to decode string.
-   */
-  public static String decodeString(ByteBuffer buffer)
-      throws IOException {
-    int length = buffer.getInt();
-    if (length == 0) {
-      return StringUtils.EMPTY;
-    } else {
-      byte[] bytes = new byte[length];
-      buffer.get(bytes);
-      return new String(bytes, UTF_8);
-    }
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
index ea282bb559..375b1b4ad1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
@@ -23,8 +23,11 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableImplV2;
+import org.apache.pinot.common.datatable.DataTableImplV3;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
index 8512024865..df18e73dc4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
@@ -22,9 +22,11 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.io.IOException;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableImplV4;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 
@@ -72,7 +74,7 @@ public class DataTableBuilderV4 extends BaseDataTableBuilder {
     if (nullRowIds == null || nullRowIds.isEmpty()) {
       _fixedSizeDataOutputStream.writeInt(0);
     } else {
-      byte[] bitmapBytes = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullRowIds);
+      byte[] bitmapBytes = RoaringBitmapUtils.serialize(nullRowIds);
       _fixedSizeDataOutputStream.writeInt(bitmapBytes.length);
       _variableSizeDataByteArrayOutputStream.write(bitmapBytes);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index 97aa3eccb4..6bfdb3717e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -21,14 +21,14 @@ package org.apache.pinot.core.operator;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
index ca411972b3..3243dcd406 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
@@ -21,10 +21,10 @@ package org.apache.pinot.core.operator;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.List;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
@@ -54,7 +54,7 @@ public class StreamingInstanceResponseOperator extends InstanceResponseOperator
       _streamObserver.onNext(StreamingResponseUtils.getDataResponse(instanceResponseDataTable.toDataOnlyDataTable()));
     } catch (IOException e) {
       // when exception occurs in streaming, we return an error-only metadata block.
-      metadataOnlyDataTable = DataTableFactory.getEmptyDataTable();
+      metadataOnlyDataTable = DataTableBuilderUtils.getEmptyDataTable();
       metadataOnlyDataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
     }
     // return a metadata-only block.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
index 1e3d843831..45170ff0d2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.core.operator.blocks;
 
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 02da5baf44..10c641713e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -22,11 +22,11 @@ import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -72,7 +72,7 @@ public class AggregationResultsBlock extends BaseResultsBlock {
 
     // Build the data table.
     DataTableBuilder dataTableBuilder =
-        DataTableFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes));
+        DataTableBuilderFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes));
     if (queryContext.isNullHandlingEnabled()) {
       RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
       for (int i = 0; i < numColumns; i++) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
index fa897b50e2..24573e04c1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
@@ -24,9 +24,9 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
index 8c81b64e27..b721671726 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -54,7 +54,7 @@ public class DistinctResultsBlock extends BaseResultsBlock {
     String[] columnNames = new String[]{_distinctFunction.getColumnName()};
     ColumnDataType[] columnDataTypes = new ColumnDataType[]{ColumnDataType.OBJECT};
     DataTableBuilder dataTableBuilder =
-        DataTableFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes));
+        DataTableBuilderFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes));
     dataTableBuilder.startRow();
     dataTableBuilder.setColumn(0, _distinctTable);
     dataTableBuilder.finishRow();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index fca3dc5838..23a6a2120d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
 
@@ -38,7 +38,7 @@ public class ExceptionResultsBlock extends BaseResultsBlock {
   @Override
   public DataTable getDataTable(QueryContext queryContext)
       throws Exception {
-    DataTable dataTable = DataTableFactory.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
     attachMetadataToDataTable(dataTable);
     return dataTable;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 3e5c534a65..3cd88b95cc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -25,12 +25,12 @@ import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.Table;
@@ -127,7 +127,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
   public DataTable getDataTable(QueryContext queryContext)
       throws Exception {
     Preconditions.checkState(_table != null, "Cannot get DataTable from segment level results");
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(_dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(_dataSchema);
     ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
     int numColumns = _dataSchema.size();
     Iterator<Record> iterator = _table.iterator();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
index 7a5d83022d..7ffeb0643d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
 
@@ -28,7 +28,7 @@ public class MetadataResultsBlock extends BaseResultsBlock {
   @Override
   public DataTable getDataTable(QueryContext queryContext)
       throws Exception {
-    DataTable dataTable = DataTableFactory.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
     attachMetadataToDataTable(dataTable);
     return dataTable;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
index 2cfef8b1ae..4f714ae7be 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
@@ -19,8 +19,8 @@
 package org.apache.pinot.core.operator.blocks.results;
 
 import java.util.Collection;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 2031cd030e..59ae72430d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -29,13 +29,13 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
index 79753c08a0..bdf5464aaa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
@@ -20,8 +20,8 @@ package org.apache.pinot.core.operator.streaming;
 
 import com.google.protobuf.ByteString;
 import java.io.IOException;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.spi.utils.CommonConstants.Query.Response;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
index e19a45d1fe..771eefdee4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -25,10 +25,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
index 832b9beede..d2bff6ab65 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.plan;
 
 import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.core.operator.InstanceResponseOperator;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.slf4j.Logger;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
index 00ffe0674e..567dce0f6a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.plan;
 
 import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 71c9e3fd73..8ef21fa1b4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -26,9 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
index b7ffe02b57..a2faa1d0ea 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
@@ -21,8 +21,8 @@ package org.apache.pinot.core.query.aggregation.function;
 import java.util.Map;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
@@ -72,13 +72,13 @@ public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggre
       RoaringBitmap valueBitmap = aggregationResultHolder.getResult();
       if (valueBitmap != null) {
         for (int i = 0; i < length; i++) {
-          valueBitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]));
+          valueBitmap.or(RoaringBitmapUtils.deserialize(bytesValues[i]));
         }
       } else {
-        valueBitmap = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[0]);
+        valueBitmap = RoaringBitmapUtils.deserialize(bytesValues[0]);
         aggregationResultHolder.setValue(valueBitmap);
         for (int i = 1; i < length; i++) {
-          valueBitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]));
+          valueBitmap.or(RoaringBitmapUtils.deserialize(bytesValues[i]));
         }
       }
       return;
@@ -139,7 +139,7 @@ public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggre
     if (storedType == DataType.BYTES) {
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
       for (int i = 0; i < length; i++) {
-        RoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]);
+        RoaringBitmap value = RoaringBitmapUtils.deserialize(bytesValues[i]);
         int groupKey = groupKeyArray[i];
         RoaringBitmap valueBitmap = groupByResultHolder.getResult(groupKey);
         if (valueBitmap != null) {
@@ -209,7 +209,7 @@ public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggre
     if (storedType == DataType.BYTES) {
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
       for (int i = 0; i < length; i++) {
-        RoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]);
+        RoaringBitmap value = RoaringBitmapUtils.deserialize(bytesValues[i]);
         for (int groupKey : groupKeysArray[i]) {
           RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
           if (bitmap != null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
index 8a8dddf308..79615b8e88 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
@@ -32,12 +32,13 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
@@ -276,7 +277,7 @@ public class DistinctTable {
   public byte[] toBytes()
       throws IOException {
     // NOTE: Serialize the DistinctTable as a DataTable
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(_dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(_dataSchema);
     ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
     int numColumns = storedColumnDataTypes.length;
     RoaringBitmap[] nullBitmaps = null;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
index 42140bce31..4712d7f911 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
@@ -23,9 +23,9 @@ import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.spi.env.PinotConfiguration;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index a50ff8b131..66461646a7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -34,6 +34,8 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.function.TransformFunctionType;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -44,15 +46,13 @@ import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.ExplainPlanRowData;
 import org.apache.pinot.core.common.ExplainPlanRows;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
-import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
@@ -177,7 +177,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       String errorMessage =
           String.format("Query scheduling took %dms (longer than query timeout of %dms) on server: %s",
               querySchedulingTimeMs, queryTimeoutMs, _instanceDataManager.getInstanceId());
-      DataTable dataTable = DataTableFactory.getEmptyDataTable();
+      DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
       dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, errorMessage));
       LOGGER.error("{} while processing requestId: {}", errorMessage, requestId);
       return dataTable;
@@ -187,7 +187,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     if (tableDataManager == null) {
       String errorMessage = String.format("Failed to find table: %s on server: %s", tableNameWithType,
           _instanceDataManager.getInstanceId());
-      DataTable dataTable = DataTableFactory.getEmptyDataTable();
+      DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
       dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage));
       LOGGER.error("{} while processing requestId: {}", errorMessage, requestId);
       return dataTable;
@@ -250,7 +250,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
           queryRequest.isEnableStreaming());
     } catch (Exception e) {
       _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
-      dataTable = DataTableFactory.getEmptyDataTable();
+      dataTable = DataTableBuilderUtils.getEmptyDataTable();
       // Do not log verbose error for BadQueryRequestException and QueryCancelledException.
       if (e instanceof BadQueryRequestException) {
         LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage());
@@ -356,7 +356,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       if (queryContext.isExplain()) {
         dataTable = getExplainPlanResultsForNoMatchingSegment(totalSegments);
       } else {
-        dataTable = DataTableUtils.buildEmptyDataTable(queryContext);
+        dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
       }
 
       Map<String, String> metadata = dataTable.getMetadata();
@@ -398,7 +398,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
 
   /** @return EXPLAIN_PLAN query result {@link DataTable} when no segments get selected for query execution.*/
   private static DataTable getExplainPlanResultsForNoMatchingSegment(int totalNumSegments) {
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
     try {
       dataTableBuilder.startRow();
       dataTableBuilder.setColumn(0, String.format(ExplainPlanRows.PLAN_START_FORMAT, totalNumSegments));
@@ -503,7 +503,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
 
   /** @return EXPLAIN PLAN query result {@link DataTable}. */
   public static DataTable processExplainPlanQueries(Plan queryPlan) {
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
     List<Operator> childOperators = queryPlan.getPlanNode().run().getChildOperators();
     assert childOperators.size() == 1;
     Operator root = childOperators.get(0);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index 238fb557b1..b727df9c30 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -22,12 +22,12 @@ import com.google.common.base.Preconditions;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 620dce3b1a..9efac85c93 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongConsumer;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
@@ -36,8 +38,6 @@ import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index d25623c698..74156f5503 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -22,11 +22,11 @@ import java.util.Iterator;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
index 27ba435f3a..496df03da8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
@@ -19,10 +19,10 @@
 package org.apache.pinot.core.query.reduce;
 
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 72507411ab..97c39b0e40 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -24,12 +24,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExplainPlanDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExplainPlanDataTableReducer.java
index 0c43ac81e1..fd35d6ca5f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExplainPlanDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExplainPlanDataTableReducer.java
@@ -25,11 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.ExplainPlanRowData;
 import org.apache.pinot.core.common.ExplainPlanRows;
 import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index cfa1621e68..97272fb292 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -41,7 +42,6 @@ import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.IndexedTable;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
index 0d1aad7934..495995a9fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -30,7 +31,6 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorService;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
index d040e4a998..cda1ad2b87 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
@@ -21,10 +21,10 @@ package org.apache.pinot.core.query.reduce;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index c1f6ef8d23..3a01dd05d3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -27,13 +27,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java
index 6d8a6e20c0..4b677c0c77 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.core.query.reduce;
 
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index ec10d0c228..b0904e4ea2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -21,12 +21,13 @@ package org.apache.pinot.core.query.request;
 import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -93,7 +94,7 @@ public class ServerQueryRequest {
   private static QueryContext getQueryContext(PinotQuery pinotQuery) {
     QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery);
     if (queryContext.isNullHandlingEnabled()) {
-      Preconditions.checkState(DataTableFactory.getDataTableVersion() >= DataTableFactory.VERSION_4,
+      Preconditions.checkState(DataTableBuilderFactory.getDataTableVersion() >= DataTableFactory.VERSION_4,
           "Null handling cannot be enabled for data table version smaller than 4");
     }
     return queryContext;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index c0f5211a1e..0ce0046a99 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -36,9 +38,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.TimerContext;
@@ -71,6 +71,7 @@ public abstract class QueryScheduler {
   private final RateLimiter _numDroppedLogRateLimiter;
   private final AtomicInteger _numDroppedLogCounter;
   protected volatile boolean _isRunning = false;
+
   /**
    * Constructor to initialize QueryScheduler
    * @param queryExecutor QueryExecutor engine to use
@@ -152,7 +153,7 @@ public abstract class QueryScheduler {
           queryRequest.getBrokerId(), e);
       // For not handled exceptions
       _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
-      dataTable = DataTableFactory.getEmptyDataTable();
+      dataTable = DataTableBuilderUtils.getEmptyDataTable();
       dataTable.addException(QueryException.getException(QueryException.INTERNAL_ERROR, e));
     }
     long requestId = queryRequest.getRequestId();
@@ -174,14 +175,11 @@ public abstract class QueryScheduler {
     long numSegmentsMatched = Long.parseLong(
         dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
     long numSegmentsPrunedInvalid = Long.parseLong(
-        dataTableMetadata.getOrDefault(
-            MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT));
+        dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT));
     long numSegmentsPrunedByLimit = Long.parseLong(
-        dataTableMetadata.getOrDefault(
-            MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT));
+        dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT));
     long numSegmentsPrunedByValue = Long.parseLong(
-        dataTableMetadata.getOrDefault(
-            MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT));
+        dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT));
     long numSegmentsConsuming = Long.parseLong(
         dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), INVALID_SEGMENTS_COUNT));
     long numConsumingSegmentsProcessed = Long.parseLong(
@@ -250,8 +248,8 @@ public abstract class QueryScheduler {
               + "broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
               + "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType,
           numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming,
-          numConsumingSegmentsProcessed, numConsumingSegmentsMatched,
-          numSegmentsPrunedInvalid, numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
+          numConsumingSegmentsProcessed, numConsumingSegmentsMatched, numSegmentsPrunedInvalid,
+          numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
           timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
           timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
           timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
@@ -346,7 +344,7 @@ public abstract class QueryScheduler {
    */
   protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
       ProcessingException error) {
-    DataTable result = DataTableFactory.getEmptyDataTable();
+    DataTable result = DataTableBuilderUtils.getEmptyDataTable();
 
     Map<String, String> dataTableMetadata = result.getMetadata();
     dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(queryRequest.getRequestId()));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 16ca57d91f..0a6b9792ae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -22,10 +22,10 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.roaringbitmap.RoaringBitmap;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index e6536d7fe4..71dbe1127f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -31,14 +31,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
@@ -239,7 +239,7 @@ public class SelectionOperatorUtils {
     ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
     int numColumns = storedColumnDataTypes.length;
 
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     RoaringBitmap[] nullBitmaps = null;
     if (nullHandlingEnabled) {
       nullBitmaps = new RoaringBitmap[numColumns];
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index c2162a591e..6e9bafeb71 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.datatable.DataTable;
 
 
 /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
index a6641d12b7..a5ef07b2e5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java
@@ -21,10 +21,10 @@ package org.apache.pinot.core.transport;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 0c2d124ccf..886dd68446 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -38,15 +38,15 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.server.access.AccessControl;
@@ -154,7 +154,8 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf>
       String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : "";
       long requestId = instanceRequest != null ? instanceRequest.getRequestId() : 0;
       LOGGER.error("Exception while processing instance request: {}", hexString, e);
-      sendErrorResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs, DataTableFactory.getEmptyDataTable(), e);
+      sendErrorResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs,
+          DataTableBuilderUtils.getEmptyDataTable(), e);
     }
   }
 
@@ -197,7 +198,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf>
         } else {
           // Send exception response.
           sendErrorResponse(ctx, queryRequest.getRequestId(), tableNameWithType, queryArrivalTimeMs,
-              DataTableFactory.getEmptyDataTable(), new Exception("Null query response."));
+              DataTableBuilderUtils.getEmptyDataTable(), new Exception("Null query response."));
         }
       }
 
@@ -224,7 +225,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf>
           e = new Exception(t);
         }
         sendErrorResponse(ctx, instanceRequest.getRequestId(), tableNameWithType, queryArrivalTimeMs,
-            DataTableFactory.getEmptyDataTable(), e);
+            DataTableBuilderUtils.getEmptyDataTable(), e);
       }
     };
   }
@@ -235,7 +236,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf>
     // will only be called if for some remote reason we are unable to handle exceptions in channelRead0.
     String message = "Unhandled Exception in " + getClass().getCanonicalName();
     LOGGER.error(message, cause);
-    sendErrorResponse(ctx, 0, null, System.currentTimeMillis(), DataTableFactory.getEmptyDataTable(),
+    sendErrorResponse(ctx, 0, null, System.currentTimeMillis(), DataTableBuilderUtils.getEmptyDataTable(),
         new Exception(message, cause));
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 06232b1e15..f2cb638d00 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -27,12 +27,12 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.config.NettyConfig;
 import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java
index e358e0051d..f669eb6909 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerResponse.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.transport;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.datatable.DataTable;
 
 
 /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index bf8f2aa1c9..579f0691a5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -33,12 +33,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server.ServerRequest;
 import org.apache.pinot.common.proto.Server.ServerResponse;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.TlsUtils;
 import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
 import org.apache.pinot.core.query.executor.QueryExecutor;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 70ca2e067e..a359bcd629 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -23,11 +23,16 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.ColumnarDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.RowDataBlock;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
@@ -82,7 +87,7 @@ public class DataBlockTest {
     DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
         columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
     List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT, nullPercentile);
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema, true);
     DataTable dataBlockFromDataTable = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index a85ca126db..9c0dd4786f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
similarity index 90%
rename from pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
index 9cd35843c9..7ef912bd6d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
@@ -19,9 +19,9 @@
 package org.apache.pinot.core.common.datatable;
 
 import java.io.IOException;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -32,14 +32,14 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
 
-public class DataTableUtilsTest {
+public class DataTableBuilderUtilsTest {
 
   @Test
   public void testBuildEmptyDataTable()
       throws IOException {
     // Selection
     QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable WHERE foo = 'bar'");
-    DataTable dataTable = DataTableUtils.buildEmptyDataTable(queryContext);
+    DataTable dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
     DataSchema dataSchema = dataTable.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"*"});
     assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING});
@@ -48,7 +48,7 @@ public class DataTableUtilsTest {
     // Aggregation
     queryContext =
         QueryContextConverterUtils.getQueryContext("SELECT COUNT(*), SUM(a), MAX(b) FROM testTable WHERE foo = 'bar'");
-    dataTable = DataTableUtils.buildEmptyDataTable(queryContext);
+    dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
     dataSchema = dataTable.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"count_star", "sum_a", "max_b"});
     assertEquals(dataSchema.getColumnDataTypes(),
@@ -61,7 +61,7 @@ public class DataTableUtilsTest {
     // Group-by
     queryContext = QueryContextConverterUtils.getQueryContext(
         "SELECT c, d, COUNT(*), SUM(a), MAX(b) FROM testTable WHERE foo = 'bar' GROUP BY c, d");
-    dataTable = DataTableUtils.buildEmptyDataTable(queryContext);
+    dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
     dataSchema = dataTable.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"c", "d", "count(*)", "sum(a)", "max(b)"});
     assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{
@@ -71,7 +71,7 @@ public class DataTableUtilsTest {
 
     // Distinct
     queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT a, b FROM testTable WHERE foo = 'bar'");
-    dataTable = DataTableUtils.buildEmptyDataTable(queryContext);
+    dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
     dataSchema = dataTable.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"distinct_a:b"});
     assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.OBJECT});
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 57b873a3bb..86f28d1f6b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -31,13 +31,14 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
@@ -107,7 +108,7 @@ public class DataTableSerDeTest {
         QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, exception);
     String expected = processingException.getMessage();
 
-    DataTable dataTable = DataTableFactory.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
     dataTable.addException(processingException);
     DataTable newDataTable = DataTableFactory.getDataTable(dataTable.toBytes());
     Assert.assertNull(newDataTable.getDataSchema());
@@ -125,7 +126,7 @@ public class DataTableSerDeTest {
 
     DataSchema dataSchema = new DataSchema(new String[]{"SV", "MV"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING_ARRAY});
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
       dataTableBuilder.startRow();
       dataTableBuilder.setColumn(0, emptyString);
@@ -155,7 +156,7 @@ public class DataTableSerDeTest {
     }
 
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
 
     DataTable dataTable = dataTableBuilder.build();
@@ -182,8 +183,8 @@ public class DataTableSerDeTest {
 
     // Verify V4 broker can deserialize data table (has data, but has no metadata) send by V3 server
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
-    DataTableBuilder dataTableBuilderV3WithDataOnly = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilder dataTableBuilderV3WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns);
 
     DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table
@@ -205,7 +206,7 @@ public class DataTableSerDeTest {
     Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
 
     // Verify V4 broker can deserialize data table (only has metadata) send by V3 server
-    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -218,8 +219,8 @@ public class DataTableSerDeTest {
     // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement
     // disabled)
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
-    DataTableBuilder dataTableBuilderV4WithDataOnly = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilder dataTableBuilderV4WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, columnDataTypes, numColumns);
     DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table
     // Deserialize data table bytes as V4
@@ -243,7 +244,7 @@ public class DataTableSerDeTest {
 
     // Verify V4 broker can deserialize data table (only has metadata) send by V4 server(with
     // ThreadCpuTimeMeasurement disabled)
-    DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a V4 data table
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -256,7 +257,7 @@ public class DataTableSerDeTest {
     // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement
     // enabled)
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table
     // Deserialize data table bytes as V4
     newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
@@ -312,8 +313,8 @@ public class DataTableSerDeTest {
 
     // Verify V3 broker can deserialize data table (has data, but has no metadata) send by V2 server
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_2);
-    DataTableBuilder dataTableBuilderV2WithDataOnly = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_2);
+    DataTableBuilder dataTableBuilderV2WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly, columnDataTypes, numColumns);
 
     DataTable dataTableV2 = dataTableBuilderV2WithDataOnly.build(); // create a V2 data table
@@ -335,7 +336,7 @@ public class DataTableSerDeTest {
     Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
 
     // Verify V3 broker can deserialize data table (only has metadata) send by V2 server
-    DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     dataTableV2 = dataTableBuilderV2WithMetadataDataOnly.build(); // create a V2 data table
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -348,8 +349,8 @@ public class DataTableSerDeTest {
     // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement
     // disabled)
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
-    DataTableBuilder dataTableBuilderV3WithDataOnly = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilder dataTableBuilderV3WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns);
     DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table
     // Deserialize data table bytes as V3
@@ -373,7 +374,7 @@ public class DataTableSerDeTest {
 
     // Verify V3 broker can deserialize data table (only has metadata) send by V3 server(with
     // ThreadCpuTimeMeasurement disabled)
-    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -386,7 +387,7 @@ public class DataTableSerDeTest {
     // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement
     // enabled)
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
     dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table
     // Deserialize data table bytes as V3
     newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
@@ -439,7 +440,7 @@ public class DataTableSerDeTest {
     }
 
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
 
     DataTable dataTable = dataTableBuilder.build();
@@ -475,8 +476,8 @@ public class DataTableSerDeTest {
 
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
 
     DataTable dataTable = dataTableBuilder.build();
@@ -543,7 +544,7 @@ public class DataTableSerDeTest {
       DataSchema.ColumnDataType[] columnDataTypes, int numColumns)
       throws IOException {
     RoaringBitmap[] nullBitmaps = null;
-    if (DataTableFactory.getDataTableVersion() >= DataTableFactory.VERSION_4) {
+    if (DataTableBuilderFactory.getDataTableVersion() >= DataTableFactory.VERSION_4) {
       nullBitmaps = new RoaringBitmap[numColumns];
       for (int colId = 0; colId < numColumns; colId++) {
         nullBitmaps[colId] = new RoaringBitmap();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index eb1e07a69e..d9cda7ad6d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -31,10 +31,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 89880795ea..a62ec50dbb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -28,9 +28,9 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
index 2db3a4cc01..f7656a55d9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
@@ -22,15 +22,15 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -54,7 +54,7 @@ public class BrokerReduceServiceTest {
         CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable GROUP BY col1");
     DataSchema dataSchema =
         new DataSchema(new String[]{"col1", "count(*)"}, new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
-    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     int numGroups = 5000;
     for (int i = 0; i < numGroups; i++) {
       dataTableBuilder.startRow();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 927b9cc7f6..527744b795 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -38,12 +38,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nullable;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -305,7 +306,7 @@ public class PrioritySchedulerTest {
           throw new RuntimeException(e);
         }
       }
-      DataTable result = DataTableFactory.getEmptyDataTable();
+      DataTable result = DataTableBuilderUtils.getEmptyDataTable();
       result.getMetadata().put(MetadataKey.TABLE.getName(), queryRequest.getTableNameWithType());
       if (_useBarrier) {
         try {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
index d2cd40a68b..5b74a2b91d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
@@ -25,9 +25,9 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.PriorityQueue;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index f2752ef0ac..a6deca642e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -22,12 +22,12 @@ import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.server.access.AccessControl;
@@ -85,7 +85,7 @@ public class QueryRoutingTest {
   public void testValidResponse()
       throws Exception {
     long requestId = 123;
-    DataTable dataTable = DataTableFactory.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
@@ -163,7 +163,7 @@ public class QueryRoutingTest {
   public void testNonMatchingRequestId()
       throws Exception {
     long requestId = 123;
-    DataTable dataTable = DataTableFactory.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
@@ -196,7 +196,7 @@ public class QueryRoutingTest {
     // To avoid flakyness, set timeoutMs to 2000 msec. For some test runs, it can take up to
     // 1400 msec to mark request as failed.
     long timeoutMs = 2000L;
-    DataTable dataTable = DataTableFactory.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
index ee4274cc53..960bde9cca 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
@@ -26,11 +26,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -284,7 +285,7 @@ public class AllNullQueriesTest extends BaseQueriesTest {
 
   public void testQueries(ColumnDataType columnDataType, File indexDir)
       throws IOException {
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     Map<String, String> queryOptions = new HashMap<>();
     queryOptions.put("enableNullHandling", "true");
     DataType dataType = columnDataType.toDataType();
@@ -578,7 +579,7 @@ public class AllNullQueriesTest extends BaseQueriesTest {
         assertNull(rows.get(0)[0]);
       }
     }
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
     _indexSegment.destroy();
     FileUtils.deleteDirectory(indexDir);
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index e73069eb69..d449ec5f8b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -26,12 +26,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.plan.maker.PlanMaker;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
index 05c2ed7480..d44a266b02 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
@@ -28,11 +28,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -159,7 +160,7 @@ public class BigDecimalQueriesTest extends BaseQueriesTest {
   }
 
   public void testQueries() {
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     Map<String, String> queryOptions = new HashMap<>();
     queryOptions.put("enableNullHandling", "true");
     {
@@ -449,7 +450,7 @@ public class BigDecimalQueriesTest extends BaseQueriesTest {
         i++;
       }
     }
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
index 4c3fdd40ba..63443449d9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
@@ -27,11 +27,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -180,7 +181,7 @@ public class BooleanNullEnabledQueriesTest extends BaseQueriesTest {
   }
 
   public void testQueries() {
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     Map<String, String> queryOptions = new HashMap<>();
     queryOptions.put("enableNullHandling", "true");
     HashSet<Integer> trueIndices = new HashSet<Integer>(Arrays.asList(1, 3, 5));
@@ -449,7 +450,7 @@ public class BooleanNullEnabledQueriesTest extends BaseQueriesTest {
       assertEquals(thirdRow[0], (long) _nullValuesCount * 4);
       assertNull(thirdRow[1]);
     }
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java
index 56089c4986..75d2cf96b9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java
@@ -30,7 +30,7 @@ import java.util.Random;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.query.AggregationOperator;
@@ -135,7 +135,7 @@ public class DistinctCountBitmapQueriesTest extends BaseQueriesTest {
       // Store serialized bitmaps in the BYTES column
       RoaringBitmap bitmap = new RoaringBitmap();
       bitmap.add(value);
-      byte[] bytesValue = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(bitmap);
+      byte[] bytesValue = RoaringBitmapUtils.serialize(bitmap);
       record.putValue(BYTES_COLUMN, bytesValue);
       records.add(record);
     }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index 3fcbb39328..ecd0745ec9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -33,15 +33,15 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.ExplainPlanRows;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.query.executor.QueryExecutor;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 6eff2ee986..1be23d77d7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -30,7 +31,6 @@ import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
 import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
index f617209fc5..e4ba37268c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
@@ -28,11 +28,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -213,7 +214,7 @@ public class NullEnabledQueriesTest extends BaseQueriesTest {
   }
 
   public void testQueries(Number baseValue, ColumnDataType dataType) {
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     Map<String, String> queryOptions = new HashMap<>();
     queryOptions.put("enableNullHandling", "true");
     {
@@ -652,7 +653,7 @@ public class NullEnabledQueriesTest extends BaseQueriesTest {
       }
       assertNull(rows.get(rows.size() - 1)[0]);
     }
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index d529e951cf..682cfa5716 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -35,9 +35,9 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.query.executor.QueryExecutor;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index ccc43197c1..79a1c9c786 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -25,7 +25,8 @@ import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.Connection;
 import org.apache.pinot.client.ConnectionFactory;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -80,7 +81,7 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS
     waitForAllDocsLoaded(600_000L);
 
     // Setting data table version to 4
-    DataTableFactory.setDataTableVersion(4);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
   }
 
   @Test
@@ -131,9 +132,6 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS
   @AfterClass
   public void tearDown()
       throws Exception {
-    // Setting data table version to 4
-    DataTableFactory.setDataTableVersion(3);
-
     dropOfflineTable(DEFAULT_TABLE_NAME);
 
     stopServer();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index 7468d8065c..168c1fb352 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -24,13 +24,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index 60acfe33df..8f5199e6a8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -27,9 +27,9 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 6cca6fabb7..c26ea3bffe 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -27,14 +27,14 @@ import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
-import org.apache.pinot.core.common.datablock.MetadataBlock;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index d9b4c31db0..3acd06cabd 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -21,17 +21,17 @@ package org.apache.pinot.query.runtime.blocks;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.List;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.ColumnarDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.RowDataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
 import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.ColumnarDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockBuilder;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
-import org.apache.pinot.core.common.datablock.RowDataBlock;
 
 
 /**
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index 4900ec193f..1a47be638d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -19,8 +19,8 @@
 package org.apache.pinot.query.runtime.blocks;
 
 import java.util.Map;
+import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
 
 
 public final class TransferableBlockUtils {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 0bccbc3b36..5f0ed67d08 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -23,8 +23,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.mailbox.MailboxService;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index cd184fe89f..b304ed5903 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -25,10 +25,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index ab5356c0a4..f1ab55061e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -21,10 +21,10 @@ package org.apache.pinot.query.runtime.operator;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index bd6142edc4..cb43ff09c0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -24,10 +24,10 @@ import java.util.HashMap;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 9ded3df8f2..9ad6c6fb22 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -21,9 +21,9 @@ package org.apache.pinot.query.runtime.operator;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 81481b6311..ad05f207ef 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -24,13 +24,13 @@ import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
-import org.apache.pinot.core.common.datablock.MetadataBlock;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 1a12a1177b..32cff01b8e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -29,12 +29,12 @@ import java.util.Random;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockBuilder;
-import org.apache.pinot.core.common.datablock.MetadataBlock;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerInstance;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 74b415512b..b3741cb28f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -25,9 +25,9 @@ import java.util.List;
 import java.util.PriorityQueue;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 66410cb0d9..0c947e1ca6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -21,10 +21,10 @@ package org.apache.pinot.query.runtime.operator;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 1288df0af1..700881e04e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -28,15 +28,15 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.util.Pair;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 9ba1b52ddc..23a77388ee 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -24,11 +24,11 @@ import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.datablock.BaseDataBlock;
-import org.apache.pinot.core.common.datablock.DataBlockUtils;
-import org.apache.pinot.core.common.datablock.MetadataBlock;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.util.TestUtils;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 9ad1723362..916a12ff1a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -31,7 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryEnvironmentTestUtils;
@@ -125,7 +126,7 @@ public class QueryRunnerTestBase extends QueryTestSet {
   @BeforeClass
   public void setUp()
       throws Exception {
-    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     QueryServerEnclosure server1 = new QueryServerEnclosure(
         ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", INDEX_DIR_S1_C, "d_O", INDEX_DIR_S1_D),
         QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
@@ -161,7 +162,7 @@ public class QueryRunnerTestBase extends QueryTestSet {
 
   @AfterClass
   public void tearDown() {
-    DataTableFactory.setDataTableVersion(DataTableFactory.DEFAULT_VERSION);
+    DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
     for (QueryServerEnclosure server : _servers.values()) {
       server.shutDown();
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
index fa19e3009e..5da5ae1963 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
-import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.roaringbitmap.RoaringBitmap;
@@ -84,11 +84,11 @@ public class DistinctCountBitmapValueAggregator implements ValueAggregator<Objec
 
   @Override
   public byte[] serializeAggregatedValue(RoaringBitmap value) {
-    return CustomSerDeUtils.ROARING_BITMAP_SER_DE.serialize(value);
+    return RoaringBitmapUtils.serialize(value);
   }
 
   @Override
   public RoaringBitmap deserializeAggregatedValue(byte[] bytes) {
-    return CustomSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytes);
+    return RoaringBitmapUtils.deserialize(bytes);
   }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 9e955ca1f2..a99af9e3d7 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -53,6 +53,7 @@ import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.restlet.resources.SystemResourceInfo;
 import org.apache.pinot.common.utils.ServiceStartableUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
@@ -61,10 +62,9 @@ import org.apache.pinot.common.utils.TlsUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
-import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
@@ -181,13 +181,13 @@ public abstract class BaseServerStarter implements ServiceStartable {
 
     // Set data table version send to broker.
     int dataTableVersion =
-        _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
-    if (dataTableVersion > Server.DEFAULT_CURRENT_DATA_TABLE_VERSION) {
+        _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, DataTableBuilderFactory.DEFAULT_VERSION);
+    if (dataTableVersion > DataTableBuilderFactory.DEFAULT_VERSION) {
       LOGGER.warn("Setting experimental DataTable version newer than default via config could result in"
           + " backward-compatibility issues. Current default DataTable version: "
-          + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
+          + DataTableBuilderFactory.DEFAULT_VERSION);
     }
-    DataTableFactory.setDataTableVersion(dataTableVersion);
+    DataTableBuilderFactory.setDataTableVersion(dataTableVersion);
 
     LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName: {}, instanceId: {}", _zkAddress,
         _helixClusterName, _instanceId);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 5b29a0015c..fc43d340ad 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -595,7 +595,6 @@ public class CommonConstants {
     public static final boolean DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT = false;
 
     public static final String CONFIG_OF_CURRENT_DATA_TABLE_VERSION = "pinot.server.instance.currentDataTableVersion";
-    public static final int DEFAULT_CURRENT_DATA_TABLE_VERSION = 3;
 
     // Environment Provider Configs
     public static final String PREFIX_OF_CONFIG_OF_ENVIRONMENT_PROVIDER_FACTORY =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org