You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/27 20:49:50 UTC

[pinot] branch master updated: Decouple ser/de from DataTable (#9468)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a6ad9f75cb Decouple ser/de from DataTable (#9468)
a6ad9f75cb is described below

commit a6ad9f75cb3e0ed751263a5648ae26974e14edb5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 27 13:49:44 2022 -0700

    Decouple ser/de from DataTable (#9468)
---
 .../org/apache/pinot/common/utils/DataTable.java   | 24 ++++++-
 .../apache/pinot/core/common/ObjectSerDeUtils.java | 68 ++++++++------------
 .../pinot/core/common/datablock/BaseDataBlock.java | 73 +++++++++++-----------
 .../core/common/datablock/DataBlockBuilder.java    |  9 +--
 .../pinot/core/common/datatable/BaseDataTable.java | 59 ++++++++---------
 .../common/datatable/BaseDataTableBuilder.java     |  9 +--
 .../core/common/datatable/DataTableBuilder.java    |  3 +-
 .../function/AggregationFunctionUtils.java         |  6 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  | 48 +++++++-------
 .../query/reduce/DistinctDataTableReducer.java     |  5 +-
 .../core/query/reduce/GroupByDataTableReducer.java |  7 ++-
 .../core/common/datablock/DataBlockTestUtils.java  |  2 +-
 .../core/common/datatable/DataTableSerDeTest.java  | 31 +++++----
 .../core/common/datatable/DataTableUtilsTest.java  |  9 +--
 14 files changed, 192 insertions(+), 161 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 90d0a1128f..1d86715bcf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -67,8 +68,6 @@ public interface DataTable {
 
   ByteArray getBytes(int rowId, int colId);
 
-  <T> T getObject(int rowId, int colId);
-
   int[] getIntArray(int rowId, int colId);
 
   long[] getLongArray(int rowId, int colId);
@@ -79,6 +78,9 @@ public interface DataTable {
 
   String[] getStringArray(int rowId, int colId);
 
+  @Nullable
+  CustomObject getCustomObject(int rowId, int colId);
+
   @Nullable
   RoaringBitmap getNullRowIds(int colId);
 
@@ -86,6 +88,24 @@ public interface DataTable {
 
   DataTable toDataOnlyDataTable();
 
+  class CustomObject {
+    private final int _type;
+    private final ByteBuffer _buffer;
+
+    public CustomObject(int type, ByteBuffer buffer) {
+      _type = type;
+      _buffer = buffer;
+    }
+
+    public int getType() {
+      return _type;
+    }
+
+    public ByteBuffer getBuffer() {
+      return _buffer;
+    }
+  }
+
   enum MetadataValueType {
     INT, LONG, STRING
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index fb209b8814..b0781bf88e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.common;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
 import com.tdunning.math.stats.MergingDigest;
@@ -54,9 +55,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
+import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
@@ -86,6 +87,8 @@ public class ObjectSerDeUtils {
   private ObjectSerDeUtils() {
   }
 
+  public static final int NULL_TYPE_VALUE = 100;
+
   public enum ObjectType {
     // NOTE: DO NOT change the value, we rely on the value to indicate the object type
     String(0),
@@ -120,8 +123,8 @@ public class ObjectSerDeUtils {
     FloatLongPair(29),
     DoubleLongPair(30),
     StringLongPair(31),
-    CovarianceTuple(32),
-    Null(100);
+    CovarianceTuple(32);
+
     private final int _value;
 
     ObjectType(int value) {
@@ -132,11 +135,7 @@ public class ObjectSerDeUtils {
       return _value;
     }
 
-    public static ObjectType getObjectType(@Nullable Object value) {
-      if (value == null) {
-        return ObjectType.Null;
-      }
-
+    public static ObjectType getObjectType(Object value) {
       if (value instanceof String) {
         return ObjectType.String;
       } else if (value instanceof Long) {
@@ -358,8 +357,7 @@ public class ObjectSerDeUtils {
     }
   };
 
-  public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE
-      = new ObjectSerDe<IntLongPair>() {
+  public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE = new ObjectSerDe<IntLongPair>() {
 
     @Override
     public byte[] serialize(IntLongPair intLongPair) {
@@ -377,8 +375,7 @@ public class ObjectSerDeUtils {
     }
   };
 
-  public static final ObjectSerDe<LongLongPair> LONG_LONG_PAIR_SER_DE
-      = new ObjectSerDe<LongLongPair>() {
+  public static final ObjectSerDe<LongLongPair> LONG_LONG_PAIR_SER_DE = new ObjectSerDe<LongLongPair>() {
 
     @Override
     public byte[] serialize(LongLongPair longLongPair) {
@@ -396,8 +393,7 @@ public class ObjectSerDeUtils {
     }
   };
 
-  public static final ObjectSerDe<FloatLongPair> FLOAT_LONG_PAIR_SER_DE
-      = new ObjectSerDe<FloatLongPair>() {
+  public static final ObjectSerDe<FloatLongPair> FLOAT_LONG_PAIR_SER_DE = new ObjectSerDe<FloatLongPair>() {
 
     @Override
     public byte[] serialize(FloatLongPair floatLongPair) {
@@ -414,8 +410,7 @@ public class ObjectSerDeUtils {
       return FloatLongPair.fromByteBuffer(byteBuffer);
     }
   };
-  public static final ObjectSerDe<DoubleLongPair> DOUBLE_LONG_PAIR_SER_DE
-      = new ObjectSerDe<DoubleLongPair>() {
+  public static final ObjectSerDe<DoubleLongPair> DOUBLE_LONG_PAIR_SER_DE = new ObjectSerDe<DoubleLongPair>() {
 
     @Override
     public byte[] serialize(DoubleLongPair doubleLongPair) {
@@ -432,8 +427,7 @@ public class ObjectSerDeUtils {
       return DoubleLongPair.fromByteBuffer(byteBuffer);
     }
   };
-  public static final ObjectSerDe<StringLongPair> STRING_LONG_PAIR_SER_DE
-      = new ObjectSerDe<StringLongPair>() {
+  public static final ObjectSerDe<StringLongPair> STRING_LONG_PAIR_SER_DE = new ObjectSerDe<StringLongPair>() {
 
     @Override
     public byte[] serialize(StringLongPair stringLongPair) {
@@ -608,11 +602,11 @@ public class ObjectSerDeUtils {
       }
 
       // De-serialize each key-value pair
-      int keyTypeValue = byteBuffer.getInt();
-      int valueTypeValue = byteBuffer.getInt();
+      ObjectSerDe keySerDe = SER_DES[byteBuffer.getInt()];
+      ObjectSerDe valueSerDe = SER_DES[byteBuffer.getInt()];
       for (int i = 0; i < size; i++) {
-        Object key = ObjectSerDeUtils.deserialize(sliceByteBuffer(byteBuffer, byteBuffer.getInt()), keyTypeValue);
-        Object value = ObjectSerDeUtils.deserialize(sliceByteBuffer(byteBuffer, byteBuffer.getInt()), valueTypeValue);
+        Object key = keySerDe.deserialize(sliceByteBuffer(byteBuffer, byteBuffer.getInt()));
+        Object value = valueSerDe.deserialize(sliceByteBuffer(byteBuffer, byteBuffer.getInt()));
         map.put(key, value);
       }
       return map;
@@ -1000,12 +994,12 @@ public class ObjectSerDeUtils {
 
       // De-serialize the values
       if (size != 0) {
-        int valueType = byteBuffer.getInt();
+        ObjectSerDe serDe = SER_DES[byteBuffer.getInt()];
         for (int i = 0; i < size; i++) {
           int numBytes = byteBuffer.getInt();
           ByteBuffer slice = byteBuffer.slice();
           slice.limit(numBytes);
-          list.add(ObjectSerDeUtils.deserialize(slice, valueType));
+          list.add(serDe.deserialize(slice));
           byteBuffer.position(byteBuffer.position() + numBytes);
         }
       }
@@ -1197,31 +1191,21 @@ public class ObjectSerDeUtils {
   };
   //@formatter:on
 
-  public static byte[] serialize(Object value) {
-    return serialize(value, ObjectType.getObjectType(value)._value);
-  }
-
-  public static byte[] serialize(Object value, ObjectType objectType) {
-    return serialize(value, objectType._value);
-  }
-
   public static byte[] serialize(Object value, int objectTypeValue) {
     return SER_DES[objectTypeValue].serialize(value);
   }
 
-  public static <T> T deserialize(byte[] bytes, ObjectType objectType) {
-    return deserialize(bytes, objectType._value);
-  }
-
-  public static <T> T deserialize(byte[] bytes, int objectTypeValue) {
-    return (T) SER_DES[objectTypeValue].deserialize(bytes);
+  public static <T> T deserialize(DataTable.CustomObject customObject) {
+    return (T) SER_DES[customObject.getType()].deserialize(customObject.getBuffer());
   }
 
-  public static <T> T deserialize(ByteBuffer byteBuffer, ObjectType objectType) {
-    return deserialize(byteBuffer, objectType._value);
+  @VisibleForTesting
+  public static byte[] serialize(Object value) {
+    return serialize(value, ObjectType.getObjectType(value)._value);
   }
 
-  public static <T> T deserialize(ByteBuffer byteBuffer, int objectTypeValue) {
-    return (T) SER_DES[objectTypeValue].deserialize(byteBuffer);
+  @VisibleForTesting
+  public static <T> T deserialize(byte[] bytes, ObjectType objectType) {
+    return (T) SER_DES[objectType._value].deserialize(bytes);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index ce7da42063..aa55d4cfb7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -151,7 +151,6 @@ public abstract class BaseDataBlock implements DataTable {
     int variableSizeDataStart = byteBuffer.getInt();
     int variableSizeDataLength = byteBuffer.getInt();
 
-
     // Read exceptions.
     if (exceptionsLength != 0) {
       byteBuffer.position(exceptionsStart);
@@ -249,28 +248,6 @@ public abstract class BaseDataBlock implements DataTable {
     return _numRows;
   }
 
-  @Nullable
-  @Override
-  public RoaringBitmap getNullRowIds(int colId) {
-    // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
-    int position = _fixDataSize + colId * Integer.BYTES * 2;
-    if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
-      return null;
-    }
-
-    _fixedSizeData.position(position);
-    int offset = _fixedSizeData.getInt();
-    int bytesLength = _fixedSizeData.getInt();
-    if (bytesLength > 0) {
-      _variableSizeData.position(offset);
-      byte[] nullBitmapBytes = new byte[bytesLength];
-      _variableSizeData.get(nullBitmapBytes);
-      return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
-    } else {
-      return null;
-    }
-  }
-
   // --------------------------------------------------------------------------
   // Fixed sized element access.
   // --------------------------------------------------------------------------
@@ -320,19 +297,6 @@ public abstract class BaseDataBlock implements DataTable {
   // Variable sized element access.
   // --------------------------------------------------------------------------
 
-  @Override
-  public <T> T getObject(int rowId, int colId) {
-    int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
-    int objectTypeValue = _variableSizeData.getInt();
-    if (size == 0) {
-      assert objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue();
-      return null;
-    }
-    ByteBuffer byteBuffer = _variableSizeData.slice();
-    byteBuffer.limit(size);
-    return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
-  }
-
   @Override
   public int[] getIntArray(int rowId, int colId) {
     int length = positionOffsetInVariableBufferAndGetLength(rowId, colId);
@@ -383,6 +347,41 @@ public abstract class BaseDataBlock implements DataTable {
     return strings;
   }
 
+  @Nullable
+  @Override
+  public CustomObject getCustomObject(int rowId, int colId) {
+    int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
+    int type = _variableSizeData.getInt();
+    if (size == 0) {
+      assert type == ObjectSerDeUtils.NULL_TYPE_VALUE;
+      return null;
+    }
+    ByteBuffer buffer = _variableSizeData.slice();
+    buffer.limit(size);
+    return new CustomObject(type, buffer);
+  }
+
+  @Nullable
+  @Override
+  public RoaringBitmap getNullRowIds(int colId) {
+    // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
+    int position = _fixDataSize + colId * Integer.BYTES * 2;
+    if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
+      return null;
+    }
+    _fixedSizeData.position(position);
+    int offset = _fixedSizeData.getInt();
+    int bytesLength = _fixedSizeData.getInt();
+    if (bytesLength > 0) {
+      _variableSizeData.position(offset);
+      byte[] nullBitmapBytes = new byte[bytesLength];
+      _variableSizeData.get(nullBitmapBytes);
+      return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
+    } else {
+      return null;
+    }
+  }
+
   // --------------------------------------------------------------------------
   // Ser/De and exception handling
   // --------------------------------------------------------------------------
@@ -646,7 +645,7 @@ public abstract class BaseDataBlock implements DataTable {
     }
 
     StringBuilder stringBuilder = new StringBuilder();
-    stringBuilder.append(_dataSchema.toString()).append('\n');
+    stringBuilder.append(_dataSchema).append('\n');
     stringBuilder.append("numRows: ").append(_numRows).append('\n');
 
     DataSchema.ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index e4f9a76a85..5c79e291f6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -443,14 +443,15 @@ public class DataBlockBuilder {
     builder._variableSizeDataByteArrayOutputStream.write(bytes);
   }
 
-  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, Object value)
+  // TODO: Move ser/de into AggregationFunction interface
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, @Nullable Object value)
       throws IOException {
     byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
-    int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
-    if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+    if (value == null) {
       byteBuffer.putInt(0);
-      builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
+      builder._variableSizeDataOutputStream.writeInt(ObjectSerDeUtils.NULL_TYPE_VALUE);
     } else {
+      int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
       byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
       byteBuffer.putInt(bytes.length);
       builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 351819578d..6b5c6d4564 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -25,6 +25,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
@@ -118,22 +119,22 @@ public abstract class BaseDataTable implements DataTable {
    */
   protected Map<String, Map<Integer, String>> deserializeDictionaryMap(ByteBuffer buffer)
       throws IOException {
-      int numDictionaries = buffer.getInt();
-      Map<String, Map<Integer, String>> dictionaryMap = new HashMap<>(numDictionaries);
-
-      for (int i = 0; i < numDictionaries; i++) {
-        String column = DataTableUtils.decodeString(buffer);
-        int dictionarySize = buffer.getInt();
-        Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
-        for (int j = 0; j < dictionarySize; j++) {
-          int key = buffer.getInt();
-          String value = DataTableUtils.decodeString(buffer);
-          dictionary.put(key, value);
-        }
-        dictionaryMap.put(column, dictionary);
+    int numDictionaries = buffer.getInt();
+    Map<String, Map<Integer, String>> dictionaryMap = new HashMap<>(numDictionaries);
+
+    for (int i = 0; i < numDictionaries; i++) {
+      String column = DataTableUtils.decodeString(buffer);
+      int dictionarySize = buffer.getInt();
+      Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
+      for (int j = 0; j < dictionarySize; j++) {
+        int key = buffer.getInt();
+        String value = DataTableUtils.decodeString(buffer);
+        dictionary.put(key, value);
       }
+      dictionaryMap.put(column, dictionary);
+    }
 
-      return dictionaryMap;
+    return dictionaryMap;
   }
 
   @Override
@@ -191,19 +192,6 @@ public abstract class BaseDataTable implements DataTable {
     return BytesUtils.toByteArray(getString(rowId, colId));
   }
 
-  @Override
-  public <T> T getObject(int rowId, int colId) {
-    int size = positionCursorInVariableBuffer(rowId, colId);
-    int objectTypeValue = _variableSizeData.getInt();
-    if (size == 0) {
-      assert objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue();
-      return null;
-    }
-    ByteBuffer byteBuffer = _variableSizeData.slice();
-    byteBuffer.limit(size);
-    return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
-  }
-
   @Override
   public int[] getIntArray(int rowId, int colId) {
     int length = positionCursorInVariableBuffer(rowId, colId);
@@ -255,6 +243,21 @@ public abstract class BaseDataTable implements DataTable {
     return strings;
   }
 
+  @Nullable
+  @Override
+  public CustomObject getCustomObject(int rowId, int colId) {
+    int size = positionCursorInVariableBuffer(rowId, colId);
+    int type = _variableSizeData.getInt();
+    if (size == 0) {
+      assert type == ObjectSerDeUtils.NULL_TYPE_VALUE;
+      return null;
+    }
+    ByteBuffer buffer = _variableSizeData.slice();
+    buffer.limit(size);
+    return new CustomObject(type, buffer);
+  }
+
+  @Nullable
   @Override
   public RoaringBitmap getNullRowIds(int colId) {
     return null;
@@ -273,7 +276,7 @@ public abstract class BaseDataTable implements DataTable {
     }
 
     StringBuilder stringBuilder = new StringBuilder();
-    stringBuilder.append(_dataSchema.toString()).append('\n');
+    stringBuilder.append(_dataSchema).append('\n');
     stringBuilder.append("numRows: ").append(_numRows).append('\n');
 
     ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
index a332adb381..63e4623a58 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -94,15 +95,15 @@ public abstract class BaseDataTableBuilder implements DataTableBuilder {
   }
 
   @Override
-  public void setColumn(int colId, Object value)
+  public void setColumn(int colId, @Nullable Object value)
       throws IOException {
     _currentRowDataByteBuffer.position(_columnOffsets[colId]);
     _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
-    if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+    if (value == null) {
       _currentRowDataByteBuffer.putInt(0);
-      _variableSizeDataOutputStream.writeInt(objectTypeValue);
+      _variableSizeDataOutputStream.writeInt(ObjectSerDeUtils.NULL_TYPE_VALUE);
     } else {
+      int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
       byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
       _currentRowDataByteBuffer.putInt(bytes.length);
       _variableSizeDataOutputStream.writeInt(objectTypeValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index b592c45c60..08a874e484 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -62,7 +62,8 @@ public interface DataTableBuilder {
   void setColumn(int colId, ByteArray value)
       throws IOException;
 
-  void setColumn(int colId, Object value)
+  // TODO: Move ser/de into AggregationFunction interface
+  void setColumn(int colId, @Nullable Object value)
       throws IOException;
 
   void setColumn(int colId, int[] values)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 21593418e9..71c9e3fd73 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
@@ -120,6 +121,8 @@ public class AggregationFunctionUtils {
 
   /**
    * Reads the intermediate result from the {@link DataTable}.
+   *
+   * TODO: Move ser/de into AggregationFunction interface
    */
   public static Object getIntermediateResult(DataTable dataTable, ColumnDataType columnDataType, int rowId, int colId) {
     switch (columnDataType) {
@@ -128,7 +131,8 @@ public class AggregationFunctionUtils {
       case DOUBLE:
         return dataTable.getDouble(rowId, colId);
       case OBJECT:
-        return dataTable.getObject(rowId, colId);
+        DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId);
+        return customObject != null ? ObjectSerDeUtils.deserialize(customObject) : null;
       default:
         throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index b2dd4c108f..a50ff8b131 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -21,7 +21,9 @@ package org.apache.pinot.core.query.executor;
 import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -46,6 +48,7 @@ import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.ExplainPlanRowData;
 import org.apache.pinot.core.common.ExplainPlanRows;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -65,7 +68,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.util.QueryOptionsUtils;
 import org.apache.pinot.core.util.trace.TraceContext;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -193,8 +195,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
 
     List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
     List<String> notAcquiredSegments = new ArrayList<>();
-    List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(
-        segmentsToQuery, notAcquiredSegments);
+    List<SegmentDataManager> segmentDataManagers =
+        tableDataManager.acquireSegments(segmentsToQuery, notAcquiredSegments);
     int numSegmentsAcquired = segmentDataManagers.size();
     List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired);
     for (SegmentDataManager segmentDataManager : segmentDataManagers) {
@@ -295,8 +297,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     // TODO: Change broker to watch both IdealState and ExternalView to not query the removed segments
     if (notAcquiredSegments.size() > 0) {
       List<String> missingSegments =
-          notAcquiredSegments.stream()
-              .filter(segmentName -> !tableDataManager.isSegmentDeletedRecently(segmentName))
+          notAcquiredSegments.stream().filter(segmentName -> !tableDataManager.isSegmentDeletedRecently(segmentName))
               .collect(Collectors.toList());
       int numMissingSegments = missingSegments.size();
       if (numMissingSegments > 0) {
@@ -374,8 +375,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
       Plan queryPlan =
           enableStreaming ? _planMaker.makeStreamingInstancePlan(selectedSegments, queryContext, executorService,
-              responseObserver, _serverMetrics) : _planMaker.makeInstancePlan(selectedSegments, queryContext,
-              executorService, _serverMetrics);
+              responseObserver, _serverMetrics)
+              : _planMaker.makeInstancePlan(selectedSegments, queryContext, executorService, _serverMetrics);
       planBuildTimer.stopAndRecord();
 
       TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
@@ -400,8 +401,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
     try {
       dataTableBuilder.startRow();
-      dataTableBuilder.setColumn(0, String.format(ExplainPlanRows.PLAN_START_FORMAT,
-          totalNumSegments));
+      dataTableBuilder.setColumn(0, String.format(ExplainPlanRows.PLAN_START_FORMAT, totalNumSegments));
       dataTableBuilder.setColumn(1, ExplainPlanRows.PLAN_START_IDS);
       dataTableBuilder.setColumn(2, ExplainPlanRows.PLAN_START_IDS);
       dataTableBuilder.finishRow();
@@ -522,14 +522,13 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
 
       // Walk through all the explain plans and create the entries in the explain plan output for each plan
       for (ExplainPlanRows explainPlanRows : listOfExplainPlans) {
-        numEmptyFilterSegments += explainPlanRows.isHasEmptyFilter()
-            ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
-        numMatchAllFilterSegments += explainPlanRows.isHasMatchAllFilter()
-            ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
+        numEmptyFilterSegments +=
+            explainPlanRows.isHasEmptyFilter() ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
+        numMatchAllFilterSegments +=
+            explainPlanRows.isHasMatchAllFilter() ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
         setValueInDataTableBuilder(dataTableBuilder,
-            String.format(ExplainPlanRows.PLAN_START_FORMAT,
-                explainPlanRows.getNumSegmentsMatchingThisPlan()), ExplainPlanRows.PLAN_START_IDS,
-            ExplainPlanRows.PLAN_START_IDS);
+            String.format(ExplainPlanRows.PLAN_START_FORMAT, explainPlanRows.getNumSegmentsMatchingThisPlan()),
+            ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS);
         for (ExplainPlanRowData explainPlanRowData : explainPlanRows.getExplainPlanRowData()) {
           setValueInDataTableBuilder(dataTableBuilder, explainPlanRowData.getExplainPlanString(),
               explainPlanRowData.getOperatorId(), explainPlanRowData.getParentId());
@@ -540,8 +539,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     }
 
     DataTable dataTable = dataTableBuilder.build();
-    dataTable.getMetadata().put(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
-        String.valueOf(numEmptyFilterSegments));
+    dataTable.getMetadata()
+        .put(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(), String.valueOf(numEmptyFilterSegments));
     dataTable.getMetadata().put(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(),
         String.valueOf(numMatchAllFilterSegments));
     return dataTable;
@@ -609,16 +608,16 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     }
     List<ExpressionContext> arguments = function.getArguments();
     if (StringUtils.remove(function.getFunctionName(), '_').equalsIgnoreCase(IN_PARTITIONED_SUBQUERY)) {
-      Preconditions.checkState(arguments.size() == 2,
+      Preconditions.checkArgument(arguments.size() == 2,
           "IN_PARTITIONED_SUBQUERY requires 2 arguments: expression, subquery");
       ExpressionContext subqueryExpression = arguments.get(1);
-      Preconditions.checkState(subqueryExpression.getType() == ExpressionContext.Type.LITERAL,
+      Preconditions.checkArgument(subqueryExpression.getType() == ExpressionContext.Type.LITERAL,
           "Second argument of IN_PARTITIONED_SUBQUERY must be a literal (subquery)");
       QueryContext subquery = QueryContextConverterUtils.getQueryContext(subqueryExpression.getLiteral());
       // Subquery should be an ID_SET aggregation only query
       //noinspection rawtypes
       AggregationFunction[] aggregationFunctions = subquery.getAggregationFunctions();
-      Preconditions.checkState(aggregationFunctions != null && aggregationFunctions.length == 1
+      Preconditions.checkArgument(aggregationFunctions != null && aggregationFunctions.length == 1
               && aggregationFunctions[0].getType() == AggregationFunctionType.IDSET
               && subquery.getGroupByExpressions() == null,
           "Subquery in IN_PARTITIONED_SUBQUERY should be an ID_SET aggregation only query, found: %s",
@@ -628,8 +627,11 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       // Make a clone of indexSegments because the method might modify the list
       DataTable dataTable =
           processQuery(new ArrayList<>(indexSegments), subquery, timerContext, executorService, null, false);
-      IdSet idSet = dataTable.getObject(0, 0);
-      String serializedIdSet = idSet.toBase64String();
+      DataTable.CustomObject idSet = dataTable.getCustomObject(0, 0);
+      Preconditions.checkState(idSet != null && idSet.getType() == ObjectSerDeUtils.ObjectType.IdSet.getValue(),
+          "Result is not an IdSet");
+      String serializedIdSet =
+          new String(Base64.getEncoder().encode(idSet.getBuffer()).array(), StandardCharsets.ISO_8859_1);
       // Rewrite the expression
       function.setFunctionName(TransformFunctionType.INIDSET.name());
       arguments.set(1, ExpressionContext.forLiteral(serializedIdSet));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 186bea1748..72507411ab 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.distinct.DistinctTable;
@@ -66,7 +67,9 @@ public class DistinctDataTableReducer implements DataTableReducer {
     // Gather all non-empty DistinctTables
     List<DistinctTable> nonEmptyDistinctTables = new ArrayList<>(dataTableMap.size());
     for (DataTable dataTable : dataTableMap.values()) {
-      DistinctTable distinctTable = dataTable.getObject(0, 0);
+      DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
+      assert customObject != null;
+      DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject);
       if (!distinctTable.isEmpty()) {
         nonEmptyDistinctTables.add(distinctTable);
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index f04d5b9724..1e2d6c18b0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Record;
@@ -313,7 +314,11 @@ public class GroupByDataTableReducer implements DataTableReducer {
                       values[colId] = dataTable.getBytes(rowId, colId);
                       break;
                     case OBJECT:
-                      values[colId] = dataTable.getObject(rowId, colId);
+                      // TODO: Move ser/de into AggregationFunction interface
+                      DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId);
+                      if (customObject != null) {
+                        values[colId] = ObjectSerDeUtils.deserialize(customObject);
+                      }
                       break;
                     // Add other aggregation intermediate result / group-by column type supports here
                     default:
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index e56ccc422b..a85ca126db 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -144,7 +144,7 @@ public class DataBlockTestUtils {
       case BYTES:
         return dataBlock.getBytes(rowId, colId);
       case OBJECT:
-        return dataBlock.getObject(rowId, colId);
+        return dataBlock.getCustomObject(rowId, colId);
       case BOOLEAN_ARRAY:
       case INT_ARRAY:
         return dataBlock.getIntArray(rowId, colId);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index cbb3c34992..57b873a3bb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
@@ -85,15 +86,15 @@ public class DataTableSerDeTest {
           .put(MetadataKey.TOTAL_DOCS.getName(), String.valueOf(200L))
           .put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true")
           .put(MetadataKey.TIME_USED_MS.getName(), String.valueOf(20000L)).put(MetadataKey.TRACE_INFO.getName(),
-          "StudentException: Error finding students\n"
-              + "        at StudentManager.findStudents(StudentManager.java:13)\n"
-              + "        at StudentProgram.main(StudentProgram.java:9)\n"
-              + "Caused by: DAOException: Error querying students from database\n"
-              + "        at StudentDAO.list(StudentDAO.java:11)\n"
-              + "        at StudentManager.findStudents(StudentManager.java:11)\n" + "        ... 1 more\n"
-              + "Caused by: java.sql.SQLException: Syntax Error\n"
-              + "        at DatabaseUtils.executeQuery(DatabaseUtils.java:5)\n"
-              + "        at StudentDAO.list(StudentDAO.java:8)\n" + "        ... 2 more")
+              "StudentException: Error finding students\n"
+                  + "        at StudentManager.findStudents(StudentManager.java:13)\n"
+                  + "        at StudentProgram.main(StudentProgram.java:9)\n"
+                  + "Caused by: DAOException: Error querying students from database\n"
+                  + "        at StudentDAO.list(StudentDAO.java:11)\n"
+                  + "        at StudentManager.findStudents(StudentManager.java:11)\n" + "        ... 1 more\n"
+                  + "Caused by: java.sql.SQLException: Syntax Error\n"
+                  + "        at DatabaseUtils.executeQuery(DatabaseUtils.java:5)\n"
+                  + "        at StudentDAO.list(StudentDAO.java:8)\n" + "        ... 2 more")
           .put(MetadataKey.REQUEST_ID.getName(), String.valueOf(90181881818L))
           .put(MetadataKey.NUM_RESIZES.getName(), String.valueOf(900L))
           .put(MetadataKey.RESIZE_TIME_MS.getName(), String.valueOf(1919199L)).build();
@@ -703,8 +704,8 @@ public class DataTableSerDeTest {
             Assert.assertEquals(newDataTable.getDouble(rowId, colId), isNull ? 0.0 : DOUBLES[rowId], ERROR_MESSAGE);
             break;
           case BIG_DECIMAL:
-            Assert.assertEquals(newDataTable.getBigDecimal(rowId, colId), isNull ? BigDecimal.ZERO
-                : BIG_DECIMALS[rowId], ERROR_MESSAGE);
+            Assert.assertEquals(newDataTable.getBigDecimal(rowId, colId),
+                isNull ? BigDecimal.ZERO : BIG_DECIMALS[rowId], ERROR_MESSAGE);
             break;
           case BOOLEAN:
             Assert.assertEquals(newDataTable.getInt(rowId, colId), isNull ? 0 : BOOLEANS[rowId], ERROR_MESSAGE);
@@ -723,7 +724,13 @@ public class DataTableSerDeTest {
                 ERROR_MESSAGE);
             break;
           case OBJECT:
-            Assert.assertEquals(newDataTable.getObject(rowId, colId), isNull ? null : OBJECTS[rowId], ERROR_MESSAGE);
+            DataTable.CustomObject customObject = newDataTable.getCustomObject(rowId, colId);
+            if (isNull) {
+              Assert.assertNull(customObject, ERROR_MESSAGE);
+            } else {
+              Assert.assertNotNull(customObject);
+              Assert.assertEquals(ObjectSerDeUtils.deserialize(customObject), OBJECTS[rowId], ERROR_MESSAGE);
+            }
             break;
           case INT_ARRAY:
             Assert.assertTrue(Arrays.equals(newDataTable.getIntArray(rowId, colId), INT_ARRAYS[rowId]), ERROR_MESSAGE);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java
index e92789ea48..9cd35843c9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java
@@ -22,13 +22,14 @@ import java.io.IOException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
 
 
 public class DataTableUtilsTest {
@@ -75,9 +76,9 @@ public class DataTableUtilsTest {
     assertEquals(dataSchema.getColumnNames(), new String[]{"distinct_a:b"});
     assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.OBJECT});
     assertEquals(dataTable.getNumberOfRows(), 1);
-    Object firstObject = dataTable.getObject(0, 0);
-    assertTrue(firstObject instanceof DistinctTable);
-    DistinctTable distinctTable = (DistinctTable) firstObject;
+    DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
+    assertNotNull(customObject);
+    DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject);
     assertEquals(distinctTable.size(), 0);
     assertEquals(distinctTable.getDataSchema().getColumnNames(), new String[]{"a", "b"});
     assertEquals(distinctTable.getDataSchema().getColumnDataTypes(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org