You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/27 20:49:50 UTC
[pinot] branch master updated: Decouple ser/de from DataTable (#9468)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a6ad9f75cb Decouple ser/de from DataTable (#9468)
a6ad9f75cb is described below
commit a6ad9f75cb3e0ed751263a5648ae26974e14edb5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Sep 27 13:49:44 2022 -0700
Decouple ser/de from DataTable (#9468)
---
.../org/apache/pinot/common/utils/DataTable.java | 24 ++++++-
.../apache/pinot/core/common/ObjectSerDeUtils.java | 68 ++++++++------------
.../pinot/core/common/datablock/BaseDataBlock.java | 73 +++++++++++-----------
.../core/common/datablock/DataBlockBuilder.java | 9 +--
.../pinot/core/common/datatable/BaseDataTable.java | 59 ++++++++---------
.../common/datatable/BaseDataTableBuilder.java | 9 +--
.../core/common/datatable/DataTableBuilder.java | 3 +-
.../function/AggregationFunctionUtils.java | 6 +-
.../query/executor/ServerQueryExecutorV1Impl.java | 48 +++++++-------
.../query/reduce/DistinctDataTableReducer.java | 5 +-
.../core/query/reduce/GroupByDataTableReducer.java | 7 ++-
.../core/common/datablock/DataBlockTestUtils.java | 2 +-
.../core/common/datatable/DataTableSerDeTest.java | 31 +++++----
.../core/common/datatable/DataTableUtilsTest.java | 9 +--
14 files changed, 192 insertions(+), 161 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 90d0a1128f..1d86715bcf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
@@ -67,8 +68,6 @@ public interface DataTable {
ByteArray getBytes(int rowId, int colId);
- <T> T getObject(int rowId, int colId);
-
int[] getIntArray(int rowId, int colId);
long[] getLongArray(int rowId, int colId);
@@ -79,6 +78,9 @@ public interface DataTable {
String[] getStringArray(int rowId, int colId);
+ @Nullable
+ CustomObject getCustomObject(int rowId, int colId);
+
@Nullable
RoaringBitmap getNullRowIds(int colId);
@@ -86,6 +88,24 @@ public interface DataTable {
DataTable toDataOnlyDataTable();
+ class CustomObject {
+ private final int _type;
+ private final ByteBuffer _buffer;
+
+ public CustomObject(int type, ByteBuffer buffer) {
+ _type = type;
+ _buffer = buffer;
+ }
+
+ public int getType() {
+ return _type;
+ }
+
+ public ByteBuffer getBuffer() {
+ return _buffer;
+ }
+ }
+
enum MetadataValueType {
INT, LONG, STRING
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index fb209b8814..b0781bf88e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.common;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.tdunning.math.stats.MergingDigest;
@@ -54,9 +55,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
+import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
@@ -86,6 +87,8 @@ public class ObjectSerDeUtils {
private ObjectSerDeUtils() {
}
+ public static final int NULL_TYPE_VALUE = 100;
+
public enum ObjectType {
// NOTE: DO NOT change the value, we rely on the value to indicate the object type
String(0),
@@ -120,8 +123,8 @@ public class ObjectSerDeUtils {
FloatLongPair(29),
DoubleLongPair(30),
StringLongPair(31),
- CovarianceTuple(32),
- Null(100);
+ CovarianceTuple(32);
+
private final int _value;
ObjectType(int value) {
@@ -132,11 +135,7 @@ public class ObjectSerDeUtils {
return _value;
}
- public static ObjectType getObjectType(@Nullable Object value) {
- if (value == null) {
- return ObjectType.Null;
- }
-
+ public static ObjectType getObjectType(Object value) {
if (value instanceof String) {
return ObjectType.String;
} else if (value instanceof Long) {
@@ -358,8 +357,7 @@ public class ObjectSerDeUtils {
}
};
- public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE
- = new ObjectSerDe<IntLongPair>() {
+ public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE = new ObjectSerDe<IntLongPair>() {
@Override
public byte[] serialize(IntLongPair intLongPair) {
@@ -377,8 +375,7 @@ public class ObjectSerDeUtils {
}
};
- public static final ObjectSerDe<LongLongPair> LONG_LONG_PAIR_SER_DE
- = new ObjectSerDe<LongLongPair>() {
+ public static final ObjectSerDe<LongLongPair> LONG_LONG_PAIR_SER_DE = new ObjectSerDe<LongLongPair>() {
@Override
public byte[] serialize(LongLongPair longLongPair) {
@@ -396,8 +393,7 @@ public class ObjectSerDeUtils {
}
};
- public static final ObjectSerDe<FloatLongPair> FLOAT_LONG_PAIR_SER_DE
- = new ObjectSerDe<FloatLongPair>() {
+ public static final ObjectSerDe<FloatLongPair> FLOAT_LONG_PAIR_SER_DE = new ObjectSerDe<FloatLongPair>() {
@Override
public byte[] serialize(FloatLongPair floatLongPair) {
@@ -414,8 +410,7 @@ public class ObjectSerDeUtils {
return FloatLongPair.fromByteBuffer(byteBuffer);
}
};
- public static final ObjectSerDe<DoubleLongPair> DOUBLE_LONG_PAIR_SER_DE
- = new ObjectSerDe<DoubleLongPair>() {
+ public static final ObjectSerDe<DoubleLongPair> DOUBLE_LONG_PAIR_SER_DE = new ObjectSerDe<DoubleLongPair>() {
@Override
public byte[] serialize(DoubleLongPair doubleLongPair) {
@@ -432,8 +427,7 @@ public class ObjectSerDeUtils {
return DoubleLongPair.fromByteBuffer(byteBuffer);
}
};
- public static final ObjectSerDe<StringLongPair> STRING_LONG_PAIR_SER_DE
- = new ObjectSerDe<StringLongPair>() {
+ public static final ObjectSerDe<StringLongPair> STRING_LONG_PAIR_SER_DE = new ObjectSerDe<StringLongPair>() {
@Override
public byte[] serialize(StringLongPair stringLongPair) {
@@ -608,11 +602,11 @@ public class ObjectSerDeUtils {
}
// De-serialize each key-value pair
- int keyTypeValue = byteBuffer.getInt();
- int valueTypeValue = byteBuffer.getInt();
+ ObjectSerDe keySerDe = SER_DES[byteBuffer.getInt()];
+ ObjectSerDe valueSerDe = SER_DES[byteBuffer.getInt()];
for (int i = 0; i < size; i++) {
- Object key = ObjectSerDeUtils.deserialize(sliceByteBuffer(byteBuffer, byteBuffer.getInt()), keyTypeValue);
- Object value = ObjectSerDeUtils.deserialize(sliceByteBuffer(byteBuffer, byteBuffer.getInt()), valueTypeValue);
+ Object key = keySerDe.deserialize(sliceByteBuffer(byteBuffer, byteBuffer.getInt()));
+ Object value = valueSerDe.deserialize(sliceByteBuffer(byteBuffer, byteBuffer.getInt()));
map.put(key, value);
}
return map;
@@ -1000,12 +994,12 @@ public class ObjectSerDeUtils {
// De-serialize the values
if (size != 0) {
- int valueType = byteBuffer.getInt();
+ ObjectSerDe serDe = SER_DES[byteBuffer.getInt()];
for (int i = 0; i < size; i++) {
int numBytes = byteBuffer.getInt();
ByteBuffer slice = byteBuffer.slice();
slice.limit(numBytes);
- list.add(ObjectSerDeUtils.deserialize(slice, valueType));
+ list.add(serDe.deserialize(slice));
byteBuffer.position(byteBuffer.position() + numBytes);
}
}
@@ -1197,31 +1191,21 @@ public class ObjectSerDeUtils {
};
//@formatter:on
- public static byte[] serialize(Object value) {
- return serialize(value, ObjectType.getObjectType(value)._value);
- }
-
- public static byte[] serialize(Object value, ObjectType objectType) {
- return serialize(value, objectType._value);
- }
-
public static byte[] serialize(Object value, int objectTypeValue) {
return SER_DES[objectTypeValue].serialize(value);
}
- public static <T> T deserialize(byte[] bytes, ObjectType objectType) {
- return deserialize(bytes, objectType._value);
- }
-
- public static <T> T deserialize(byte[] bytes, int objectTypeValue) {
- return (T) SER_DES[objectTypeValue].deserialize(bytes);
+ public static <T> T deserialize(DataTable.CustomObject customObject) {
+ return (T) SER_DES[customObject.getType()].deserialize(customObject.getBuffer());
}
- public static <T> T deserialize(ByteBuffer byteBuffer, ObjectType objectType) {
- return deserialize(byteBuffer, objectType._value);
+ @VisibleForTesting
+ public static byte[] serialize(Object value) {
+ return serialize(value, ObjectType.getObjectType(value)._value);
}
- public static <T> T deserialize(ByteBuffer byteBuffer, int objectTypeValue) {
- return (T) SER_DES[objectTypeValue].deserialize(byteBuffer);
+ @VisibleForTesting
+ public static <T> T deserialize(byte[] bytes, ObjectType objectType) {
+ return (T) SER_DES[objectType._value].deserialize(bytes);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index ce7da42063..aa55d4cfb7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -151,7 +151,6 @@ public abstract class BaseDataBlock implements DataTable {
int variableSizeDataStart = byteBuffer.getInt();
int variableSizeDataLength = byteBuffer.getInt();
-
// Read exceptions.
if (exceptionsLength != 0) {
byteBuffer.position(exceptionsStart);
@@ -249,28 +248,6 @@ public abstract class BaseDataBlock implements DataTable {
return _numRows;
}
- @Nullable
- @Override
- public RoaringBitmap getNullRowIds(int colId) {
- // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
- int position = _fixDataSize + colId * Integer.BYTES * 2;
- if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
- return null;
- }
-
- _fixedSizeData.position(position);
- int offset = _fixedSizeData.getInt();
- int bytesLength = _fixedSizeData.getInt();
- if (bytesLength > 0) {
- _variableSizeData.position(offset);
- byte[] nullBitmapBytes = new byte[bytesLength];
- _variableSizeData.get(nullBitmapBytes);
- return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
- } else {
- return null;
- }
- }
-
// --------------------------------------------------------------------------
// Fixed sized element access.
// --------------------------------------------------------------------------
@@ -320,19 +297,6 @@ public abstract class BaseDataBlock implements DataTable {
// Variable sized element access.
// --------------------------------------------------------------------------
- @Override
- public <T> T getObject(int rowId, int colId) {
- int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
- int objectTypeValue = _variableSizeData.getInt();
- if (size == 0) {
- assert objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue();
- return null;
- }
- ByteBuffer byteBuffer = _variableSizeData.slice();
- byteBuffer.limit(size);
- return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
- }
-
@Override
public int[] getIntArray(int rowId, int colId) {
int length = positionOffsetInVariableBufferAndGetLength(rowId, colId);
@@ -383,6 +347,41 @@ public abstract class BaseDataBlock implements DataTable {
return strings;
}
+ @Nullable
+ @Override
+ public CustomObject getCustomObject(int rowId, int colId) {
+ int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
+ int type = _variableSizeData.getInt();
+ if (size == 0) {
+ assert type == ObjectSerDeUtils.NULL_TYPE_VALUE;
+ return null;
+ }
+ ByteBuffer buffer = _variableSizeData.slice();
+ buffer.limit(size);
+ return new CustomObject(type, buffer);
+ }
+
+ @Nullable
+ @Override
+ public RoaringBitmap getNullRowIds(int colId) {
+ // _fixedSizeData stores two ints per col's null bitmap: offset, and length.
+ int position = _fixDataSize + colId * Integer.BYTES * 2;
+ if (_fixedSizeData == null || position >= _fixedSizeData.limit()) {
+ return null;
+ }
+ _fixedSizeData.position(position);
+ int offset = _fixedSizeData.getInt();
+ int bytesLength = _fixedSizeData.getInt();
+ if (bytesLength > 0) {
+ _variableSizeData.position(offset);
+ byte[] nullBitmapBytes = new byte[bytesLength];
+ _variableSizeData.get(nullBitmapBytes);
+ return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(nullBitmapBytes);
+ } else {
+ return null;
+ }
+ }
+
// --------------------------------------------------------------------------
// Ser/De and exception handling
// --------------------------------------------------------------------------
@@ -646,7 +645,7 @@ public abstract class BaseDataBlock implements DataTable {
}
StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(_dataSchema.toString()).append('\n');
+ stringBuilder.append(_dataSchema).append('\n');
stringBuilder.append("numRows: ").append(_numRows).append('\n');
DataSchema.ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index e4f9a76a85..5c79e291f6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -443,14 +443,15 @@ public class DataBlockBuilder {
builder._variableSizeDataByteArrayOutputStream.write(bytes);
}
- private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, Object value)
+ // TODO: Move ser/de into AggregationFunction interface
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, @Nullable Object value)
throws IOException {
byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
- int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
- if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+ if (value == null) {
byteBuffer.putInt(0);
- builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
+ builder._variableSizeDataOutputStream.writeInt(ObjectSerDeUtils.NULL_TYPE_VALUE);
} else {
+ int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
byteBuffer.putInt(bytes.length);
builder._variableSizeDataOutputStream.writeInt(objectTypeValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 351819578d..6b5c6d4564 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -25,6 +25,7 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
@@ -118,22 +119,22 @@ public abstract class BaseDataTable implements DataTable {
*/
protected Map<String, Map<Integer, String>> deserializeDictionaryMap(ByteBuffer buffer)
throws IOException {
- int numDictionaries = buffer.getInt();
- Map<String, Map<Integer, String>> dictionaryMap = new HashMap<>(numDictionaries);
-
- for (int i = 0; i < numDictionaries; i++) {
- String column = DataTableUtils.decodeString(buffer);
- int dictionarySize = buffer.getInt();
- Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
- for (int j = 0; j < dictionarySize; j++) {
- int key = buffer.getInt();
- String value = DataTableUtils.decodeString(buffer);
- dictionary.put(key, value);
- }
- dictionaryMap.put(column, dictionary);
+ int numDictionaries = buffer.getInt();
+ Map<String, Map<Integer, String>> dictionaryMap = new HashMap<>(numDictionaries);
+
+ for (int i = 0; i < numDictionaries; i++) {
+ String column = DataTableUtils.decodeString(buffer);
+ int dictionarySize = buffer.getInt();
+ Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
+ for (int j = 0; j < dictionarySize; j++) {
+ int key = buffer.getInt();
+ String value = DataTableUtils.decodeString(buffer);
+ dictionary.put(key, value);
}
+ dictionaryMap.put(column, dictionary);
+ }
- return dictionaryMap;
+ return dictionaryMap;
}
@Override
@@ -191,19 +192,6 @@ public abstract class BaseDataTable implements DataTable {
return BytesUtils.toByteArray(getString(rowId, colId));
}
- @Override
- public <T> T getObject(int rowId, int colId) {
- int size = positionCursorInVariableBuffer(rowId, colId);
- int objectTypeValue = _variableSizeData.getInt();
- if (size == 0) {
- assert objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue();
- return null;
- }
- ByteBuffer byteBuffer = _variableSizeData.slice();
- byteBuffer.limit(size);
- return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
- }
-
@Override
public int[] getIntArray(int rowId, int colId) {
int length = positionCursorInVariableBuffer(rowId, colId);
@@ -255,6 +243,21 @@ public abstract class BaseDataTable implements DataTable {
return strings;
}
+ @Nullable
+ @Override
+ public CustomObject getCustomObject(int rowId, int colId) {
+ int size = positionCursorInVariableBuffer(rowId, colId);
+ int type = _variableSizeData.getInt();
+ if (size == 0) {
+ assert type == ObjectSerDeUtils.NULL_TYPE_VALUE;
+ return null;
+ }
+ ByteBuffer buffer = _variableSizeData.slice();
+ buffer.limit(size);
+ return new CustomObject(type, buffer);
+ }
+
+ @Nullable
@Override
public RoaringBitmap getNullRowIds(int colId) {
return null;
@@ -273,7 +276,7 @@ public abstract class BaseDataTable implements DataTable {
}
StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(_dataSchema.toString()).append('\n');
+ stringBuilder.append(_dataSchema).append('\n');
stringBuilder.append("numRows: ").append(_numRows).append('\n');
ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
index a332adb381..63e4623a58 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -94,15 +95,15 @@ public abstract class BaseDataTableBuilder implements DataTableBuilder {
}
@Override
- public void setColumn(int colId, Object value)
+ public void setColumn(int colId, @Nullable Object value)
throws IOException {
_currentRowDataByteBuffer.position(_columnOffsets[colId]);
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
- if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+ if (value == null) {
_currentRowDataByteBuffer.putInt(0);
- _variableSizeDataOutputStream.writeInt(objectTypeValue);
+ _variableSizeDataOutputStream.writeInt(ObjectSerDeUtils.NULL_TYPE_VALUE);
} else {
+ int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
_currentRowDataByteBuffer.putInt(bytes.length);
_variableSizeDataOutputStream.writeInt(objectTypeValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index b592c45c60..08a874e484 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -62,7 +62,8 @@ public interface DataTableBuilder {
void setColumn(int colId, ByteArray value)
throws IOException;
- void setColumn(int colId, Object value)
+ // TODO: Move ser/de into AggregationFunction interface
+ void setColumn(int colId, @Nullable Object value)
throws IOException;
void setColumn(int colId, int[] values)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 21593418e9..71c9e3fd73 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
@@ -120,6 +121,8 @@ public class AggregationFunctionUtils {
/**
* Reads the intermediate result from the {@link DataTable}.
+ *
+ * TODO: Move ser/de into AggregationFunction interface
*/
public static Object getIntermediateResult(DataTable dataTable, ColumnDataType columnDataType, int rowId, int colId) {
switch (columnDataType) {
@@ -128,7 +131,8 @@ public class AggregationFunctionUtils {
case DOUBLE:
return dataTable.getDouble(rowId, colId);
case OBJECT:
- return dataTable.getObject(rowId, colId);
+ DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId);
+ return customObject != null ? ObjectSerDeUtils.deserialize(customObject) : null;
default:
throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index b2dd4c108f..a50ff8b131 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -21,7 +21,9 @@ package org.apache.pinot.core.query.executor;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -46,6 +48,7 @@ import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.common.ExplainPlanRowData;
import org.apache.pinot.core.common.ExplainPlanRows;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -65,7 +68,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.TimerContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.util.QueryOptionsUtils;
import org.apache.pinot.core.util.trace.TraceContext;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -193,8 +195,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
List<String> notAcquiredSegments = new ArrayList<>();
- List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(
- segmentsToQuery, notAcquiredSegments);
+ List<SegmentDataManager> segmentDataManagers =
+ tableDataManager.acquireSegments(segmentsToQuery, notAcquiredSegments);
int numSegmentsAcquired = segmentDataManagers.size();
List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired);
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
@@ -295,8 +297,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
// TODO: Change broker to watch both IdealState and ExternalView to not query the removed segments
if (notAcquiredSegments.size() > 0) {
List<String> missingSegments =
- notAcquiredSegments.stream()
- .filter(segmentName -> !tableDataManager.isSegmentDeletedRecently(segmentName))
+ notAcquiredSegments.stream().filter(segmentName -> !tableDataManager.isSegmentDeletedRecently(segmentName))
.collect(Collectors.toList());
int numMissingSegments = missingSegments.size();
if (numMissingSegments > 0) {
@@ -374,8 +375,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
Plan queryPlan =
enableStreaming ? _planMaker.makeStreamingInstancePlan(selectedSegments, queryContext, executorService,
- responseObserver, _serverMetrics) : _planMaker.makeInstancePlan(selectedSegments, queryContext,
- executorService, _serverMetrics);
+ responseObserver, _serverMetrics)
+ : _planMaker.makeInstancePlan(selectedSegments, queryContext, executorService, _serverMetrics);
planBuildTimer.stopAndRecord();
TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
@@ -400,8 +401,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
try {
dataTableBuilder.startRow();
- dataTableBuilder.setColumn(0, String.format(ExplainPlanRows.PLAN_START_FORMAT,
- totalNumSegments));
+ dataTableBuilder.setColumn(0, String.format(ExplainPlanRows.PLAN_START_FORMAT, totalNumSegments));
dataTableBuilder.setColumn(1, ExplainPlanRows.PLAN_START_IDS);
dataTableBuilder.setColumn(2, ExplainPlanRows.PLAN_START_IDS);
dataTableBuilder.finishRow();
@@ -522,14 +522,13 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
// Walk through all the explain plans and create the entries in the explain plan output for each plan
for (ExplainPlanRows explainPlanRows : listOfExplainPlans) {
- numEmptyFilterSegments += explainPlanRows.isHasEmptyFilter()
- ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
- numMatchAllFilterSegments += explainPlanRows.isHasMatchAllFilter()
- ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
+ numEmptyFilterSegments +=
+ explainPlanRows.isHasEmptyFilter() ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
+ numMatchAllFilterSegments +=
+ explainPlanRows.isHasMatchAllFilter() ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
setValueInDataTableBuilder(dataTableBuilder,
- String.format(ExplainPlanRows.PLAN_START_FORMAT,
- explainPlanRows.getNumSegmentsMatchingThisPlan()), ExplainPlanRows.PLAN_START_IDS,
- ExplainPlanRows.PLAN_START_IDS);
+ String.format(ExplainPlanRows.PLAN_START_FORMAT, explainPlanRows.getNumSegmentsMatchingThisPlan()),
+ ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS);
for (ExplainPlanRowData explainPlanRowData : explainPlanRows.getExplainPlanRowData()) {
setValueInDataTableBuilder(dataTableBuilder, explainPlanRowData.getExplainPlanString(),
explainPlanRowData.getOperatorId(), explainPlanRowData.getParentId());
@@ -540,8 +539,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
}
DataTable dataTable = dataTableBuilder.build();
- dataTable.getMetadata().put(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
- String.valueOf(numEmptyFilterSegments));
+ dataTable.getMetadata()
+ .put(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(), String.valueOf(numEmptyFilterSegments));
dataTable.getMetadata().put(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(),
String.valueOf(numMatchAllFilterSegments));
return dataTable;
@@ -609,16 +608,16 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
}
List<ExpressionContext> arguments = function.getArguments();
if (StringUtils.remove(function.getFunctionName(), '_').equalsIgnoreCase(IN_PARTITIONED_SUBQUERY)) {
- Preconditions.checkState(arguments.size() == 2,
+ Preconditions.checkArgument(arguments.size() == 2,
"IN_PARTITIONED_SUBQUERY requires 2 arguments: expression, subquery");
ExpressionContext subqueryExpression = arguments.get(1);
- Preconditions.checkState(subqueryExpression.getType() == ExpressionContext.Type.LITERAL,
+ Preconditions.checkArgument(subqueryExpression.getType() == ExpressionContext.Type.LITERAL,
"Second argument of IN_PARTITIONED_SUBQUERY must be a literal (subquery)");
QueryContext subquery = QueryContextConverterUtils.getQueryContext(subqueryExpression.getLiteral());
// Subquery should be an ID_SET aggregation only query
//noinspection rawtypes
AggregationFunction[] aggregationFunctions = subquery.getAggregationFunctions();
- Preconditions.checkState(aggregationFunctions != null && aggregationFunctions.length == 1
+ Preconditions.checkArgument(aggregationFunctions != null && aggregationFunctions.length == 1
&& aggregationFunctions[0].getType() == AggregationFunctionType.IDSET
&& subquery.getGroupByExpressions() == null,
"Subquery in IN_PARTITIONED_SUBQUERY should be an ID_SET aggregation only query, found: %s",
@@ -628,8 +627,11 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
// Make a clone of indexSegments because the method might modify the list
DataTable dataTable =
processQuery(new ArrayList<>(indexSegments), subquery, timerContext, executorService, null, false);
- IdSet idSet = dataTable.getObject(0, 0);
- String serializedIdSet = idSet.toBase64String();
+ DataTable.CustomObject idSet = dataTable.getCustomObject(0, 0);
+ Preconditions.checkState(idSet != null && idSet.getType() == ObjectSerDeUtils.ObjectType.IdSet.getValue(),
+ "Result is not an IdSet");
+ String serializedIdSet =
+ new String(Base64.getEncoder().encode(idSet.getBuffer()).array(), StandardCharsets.ISO_8859_1);
// Rewrite the expression
function.setFunctionName(TransformFunctionType.INIDSET.name());
arguments.set(1, ExpressionContext.forLiteral(serializedIdSet));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 186bea1748..72507411ab 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctTable;
@@ -66,7 +67,9 @@ public class DistinctDataTableReducer implements DataTableReducer {
// Gather all non-empty DistinctTables
List<DistinctTable> nonEmptyDistinctTables = new ArrayList<>(dataTableMap.size());
for (DataTable dataTable : dataTableMap.values()) {
- DistinctTable distinctTable = dataTable.getObject(0, 0);
+ DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
+ assert customObject != null;
+ DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject);
if (!distinctTable.isEmpty()) {
nonEmptyDistinctTables.add(distinctTable);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index f04d5b9724..1e2d6c18b0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.Record;
@@ -313,7 +314,11 @@ public class GroupByDataTableReducer implements DataTableReducer {
values[colId] = dataTable.getBytes(rowId, colId);
break;
case OBJECT:
- values[colId] = dataTable.getObject(rowId, colId);
+ // TODO: Move ser/de into AggregationFunction interface
+ DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId);
+ if (customObject != null) {
+ values[colId] = ObjectSerDeUtils.deserialize(customObject);
+ }
break;
// Add other aggregation intermediate result / group-by column type supports here
default:
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index e56ccc422b..a85ca126db 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -144,7 +144,7 @@ public class DataBlockTestUtils {
case BYTES:
return dataBlock.getBytes(rowId, colId);
case OBJECT:
- return dataBlock.getObject(rowId, colId);
+ return dataBlock.getCustomObject(rowId, colId);
case BOOLEAN_ARRAY:
case INT_ARRAY:
return dataBlock.getIntArray(rowId, colId);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index cbb3c34992..57b873a3bb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.request.context.ThreadTimer;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
@@ -85,15 +86,15 @@ public class DataTableSerDeTest {
.put(MetadataKey.TOTAL_DOCS.getName(), String.valueOf(200L))
.put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true")
.put(MetadataKey.TIME_USED_MS.getName(), String.valueOf(20000L)).put(MetadataKey.TRACE_INFO.getName(),
- "StudentException: Error finding students\n"
- + " at StudentManager.findStudents(StudentManager.java:13)\n"
- + " at StudentProgram.main(StudentProgram.java:9)\n"
- + "Caused by: DAOException: Error querying students from database\n"
- + " at StudentDAO.list(StudentDAO.java:11)\n"
- + " at StudentManager.findStudents(StudentManager.java:11)\n" + " ... 1 more\n"
- + "Caused by: java.sql.SQLException: Syntax Error\n"
- + " at DatabaseUtils.executeQuery(DatabaseUtils.java:5)\n"
- + " at StudentDAO.list(StudentDAO.java:8)\n" + " ... 2 more")
+ "StudentException: Error finding students\n"
+ + " at StudentManager.findStudents(StudentManager.java:13)\n"
+ + " at StudentProgram.main(StudentProgram.java:9)\n"
+ + "Caused by: DAOException: Error querying students from database\n"
+ + " at StudentDAO.list(StudentDAO.java:11)\n"
+ + " at StudentManager.findStudents(StudentManager.java:11)\n" + " ... 1 more\n"
+ + "Caused by: java.sql.SQLException: Syntax Error\n"
+ + " at DatabaseUtils.executeQuery(DatabaseUtils.java:5)\n"
+ + " at StudentDAO.list(StudentDAO.java:8)\n" + " ... 2 more")
.put(MetadataKey.REQUEST_ID.getName(), String.valueOf(90181881818L))
.put(MetadataKey.NUM_RESIZES.getName(), String.valueOf(900L))
.put(MetadataKey.RESIZE_TIME_MS.getName(), String.valueOf(1919199L)).build();
@@ -703,8 +704,8 @@ public class DataTableSerDeTest {
Assert.assertEquals(newDataTable.getDouble(rowId, colId), isNull ? 0.0 : DOUBLES[rowId], ERROR_MESSAGE);
break;
case BIG_DECIMAL:
- Assert.assertEquals(newDataTable.getBigDecimal(rowId, colId), isNull ? BigDecimal.ZERO
- : BIG_DECIMALS[rowId], ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getBigDecimal(rowId, colId),
+ isNull ? BigDecimal.ZERO : BIG_DECIMALS[rowId], ERROR_MESSAGE);
break;
case BOOLEAN:
Assert.assertEquals(newDataTable.getInt(rowId, colId), isNull ? 0 : BOOLEANS[rowId], ERROR_MESSAGE);
@@ -723,7 +724,13 @@ public class DataTableSerDeTest {
ERROR_MESSAGE);
break;
case OBJECT:
- Assert.assertEquals(newDataTable.getObject(rowId, colId), isNull ? null : OBJECTS[rowId], ERROR_MESSAGE);
+ DataTable.CustomObject customObject = newDataTable.getCustomObject(rowId, colId);
+ if (isNull) {
+ Assert.assertNull(customObject, ERROR_MESSAGE);
+ } else {
+ Assert.assertNotNull(customObject);
+ Assert.assertEquals(ObjectSerDeUtils.deserialize(customObject), OBJECTS[rowId], ERROR_MESSAGE);
+ }
break;
case INT_ARRAY:
Assert.assertTrue(Arrays.equals(newDataTable.getIntArray(rowId, colId), INT_ARRAYS[rowId]), ERROR_MESSAGE);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java
index e92789ea48..9cd35843c9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableUtilsTest.java
@@ -22,13 +22,14 @@ import java.io.IOException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
public class DataTableUtilsTest {
@@ -75,9 +76,9 @@ public class DataTableUtilsTest {
assertEquals(dataSchema.getColumnNames(), new String[]{"distinct_a:b"});
assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.OBJECT});
assertEquals(dataTable.getNumberOfRows(), 1);
- Object firstObject = dataTable.getObject(0, 0);
- assertTrue(firstObject instanceof DistinctTable);
- DistinctTable distinctTable = (DistinctTable) firstObject;
+ DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
+ assertNotNull(customObject);
+ DistinctTable distinctTable = ObjectSerDeUtils.deserialize(customObject);
assertEquals(distinctTable.size(), 0);
assertEquals(distinctTable.getDataSchema().getColumnNames(), new String[]{"a", "b"});
assertEquals(distinctTable.getDataSchema().getColumnDataTypes(),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org