You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/06/14 22:34:45 UTC
[pinot] branch master updated: Use multi-stage DataBlock as DataTableImplV4 (#8874)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c802786ea9 Use multi-stage DataBlock as DataTableImplV4 (#8874)
c802786ea9 is described below
commit c802786ea95cff67b83ff4d24f796b965e565854
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Jun 14 15:34:40 2022 -0700
Use multi-stage DataBlock as DataTableImplV4 (#8874)
multi-stage DataBlock as DataTableImplV4
* refactor data blocks into pinot-core
* do not allow experimental data table version setting via server config
* add multi-stage query engine integration test.
* address diff comments
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../core/common/datablock}/BaseDataBlock.java | 2 +-
.../core/common/datablock}/ColumnarDataBlock.java | 2 +-
.../core/common/datablock}/DataBlockBuilder.java | 5 +-
.../core/common/datablock}/DataBlockUtils.java | 21 +---
.../core/common/datablock}/MetadataBlock.java | 2 +-
.../pinot/core/common/datablock}/RowDataBlock.java | 2 +-
.../pinot/core/common/datatable/BaseDataTable.java | 7 +-
.../core/common/datatable/DataTableBuilder.java | 69 ++++++----
.../core/common/datatable/DataTableFactory.java | 2 +
.../core/common/datatable/DataTableImplV2.java | 7 +-
.../core/common/datatable/DataTableImplV3.java | 7 +-
...{DataTableFactory.java => DataTableImplV4.java} | 39 +++---
.../core/common/datatable/DataTableUtils.java | 10 +-
.../core/common/datablock}/DataBlockTest.java | 51 +++++++-
.../core/common/datablock}/DataBlockTestUtils.java | 2 +-
.../core/common/datatable/DataTableSerDeTest.java | 134 ++++++++++++++++++++
.../tests/MultiStageEngineIntegrationTest.java | 139 +++++++++++++++++++++
.../apache/pinot/query/runtime/QueryRunner.java | 7 +-
.../query/runtime/blocks/TransferableBlock.java | 3 +
.../runtime/blocks/TransferableBlockUtils.java | 51 ++++++++
.../runtime/executor/WorkerQueryExecutor.java | 4 +-
.../query/runtime/operator/HashJoinOperator.java | 13 +-
.../runtime/operator/MailboxReceiveOperator.java | 7 +-
.../runtime/operator/MailboxSendOperator.java | 9 +-
.../pinot/query/service/QueryDispatcher.java | 6 +-
.../query/mailbox/GrpcMailboxServiceTest.java | 2 +-
.../server/starter/helix/BaseServerStarter.java | 9 +-
27 files changed, 510 insertions(+), 102 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
similarity index 99%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 97d007b58b..607fdef8bd 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
similarity index 98%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
index 8510043251..1d2429f118 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
similarity index 98%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 0c456c1d85..ff6cc4e43c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.spi.utils.ArrayCopyUtils;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
@@ -73,7 +72,7 @@ public class DataBlockBuilder {
}
} else if (_blockType == BaseDataBlock.Type.ROW) {
_columnOffsets = new int[_numColumns];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
+ _rowSizeInBytes = DataBlockUtils.computeColumnOffsets(dataSchema, _columnOffsets);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
similarity index 85%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
index 690be1353a..f1dedd49d0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,11 +37,6 @@ public final class DataBlockUtils {
static {
EOS_DATA_BLOCK._metadata.put(DataTable.MetadataKey.TABLE.getName(), "END_OF_STREAM");
}
- private static final TransferableBlock EOS_TRANSFERABLE_BLOCK = new TransferableBlock(EOS_DATA_BLOCK);
-
- public static TransferableBlock getEndOfStreamTransferableBlock() {
- return EOS_TRANSFERABLE_BLOCK;
- }
public static MetadataBlock getEndOfStreamDataBlock() {
return EOS_DATA_BLOCK;
@@ -58,24 +53,10 @@ public final class DataBlockUtils {
return errorBlock;
}
- public static TransferableBlock getErrorTransferableBlock(Exception e) {
- return new TransferableBlock(getErrorDataBlock(e));
- }
-
public static MetadataBlock getEmptyDataBlock(DataSchema dataSchema) {
return dataSchema == null ? EOS_DATA_BLOCK : new MetadataBlock(dataSchema);
}
- public static TransferableBlock getEmptyTransferableBlock(DataSchema dataSchema) {
- return new TransferableBlock(getEmptyDataBlock(dataSchema));
- }
-
- public static boolean isEndOfStream(TransferableBlock transferableBlock) {
- return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA)
- && "END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata()
- .get(DataTable.MetadataKey.TABLE.getName()));
- }
-
public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer)
throws IOException {
int versionType = byteBuffer.getInt();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
similarity index 97%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
index a3ffc7a126..9be572d938 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
similarity index 98%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java
rename to pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
index f8520c54f6..bece1a7eaf 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 66f5086a01..788fe62765 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -58,7 +58,7 @@ public abstract class BaseDataTable implements DataTable {
_numColumns = dataSchema.size();
_dataSchema = dataSchema;
_columnOffsets = new int[_numColumns];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
+ _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets, getVersion());
_dictionaryMap = dictionaryMap;
_fixedSizeDataBytes = fixedSizeDataBytes;
_fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
@@ -84,6 +84,11 @@ public abstract class BaseDataTable implements DataTable {
_metadata = new HashMap<>();
}
+ /**
+ * get the current data table version.
+ */
+ public abstract int getVersion();
+
/**
* Helper method to serialize dictionary map.
*/
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index a0b046a813..05f29bd774 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -73,14 +73,20 @@ import org.apache.pinot.spi.utils.ByteArray;
*
*
*/
-// TODO: potential optimizations:
-// TODO: 1. Fix float size.
-// TODO: 2. Use one dictionary for all columns (save space).
-// TODO: 3. Given a data schema, write all values one by one instead of using rowId and colId to position (save time).
-// TODO: 4. Store bytes as variable size data instead of String
+// TODO: potential optimizations (DataTableV3 and below)
+// TODO: - Fix float size.
+// TODO: note: (Fixed in V4, remove this comment once V3 is deprecated)
+// TODO: - Given a data schema, write all values one by one instead of using rowId and colId to position (save time).
+// TODO: note: (Fixed in V4, remove this comment once V3 is deprecated)
+// TODO: - Store bytes as variable size data instead of String
+// TODO: note: (Fixed in V4, remove this comment once V3 is deprecated)
+// TODO: - use builder factory pattern to for different version so that no version check per build.
+// TODO: - Use one dictionary for all columns (save space).
+
public class DataTableBuilder {
public static final int VERSION_2 = 2;
public static final int VERSION_3 = 3;
+ public static final int VERSION_4 = 4;
private static int _version = VERSION_3;
private final DataSchema _dataSchema;
private final int[] _columnOffsets;
@@ -98,15 +104,24 @@ public class DataTableBuilder {
public DataTableBuilder(DataSchema dataSchema) {
_dataSchema = dataSchema;
_columnOffsets = new int[dataSchema.size()];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets);
+ _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets, _version);
}
public static DataTable getEmptyDataTable() {
- return _version == VERSION_2 ? new DataTableImplV2() : new DataTableImplV3();
+ switch (_version) {
+ case VERSION_2:
+ return new DataTableImplV2();
+ case VERSION_3:
+ return new DataTableImplV3();
+ case VERSION_4:
+ return new DataTableImplV4();
+ default:
+ throw new IllegalStateException("Unexpected value: " + _version);
+ }
}
public static void setCurrentDataTableVersion(int version) {
- if (version != VERSION_2 && version != VERSION_3) {
+ if (version != VERSION_2 && version != VERSION_3 && version != VERSION_4) {
throw new IllegalArgumentException("Unsupported version: " + version);
}
_version = version;
@@ -191,19 +206,16 @@ public class DataTableBuilder {
public void setColumn(int colId, ByteArray value)
throws IOException {
- // NOTE: Use String to store bytes value in DataTable V2 for backward-compatibility
- setColumn(colId, value.toHexString());
-
- /*
- TODO: Store bytes as variable size data instead of String. Make the change for the next version data table for
- backward-compatibility
-
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- byte[] bytes = value.getBytes();
- _currentRowDataByteBuffer.putInt(bytes.length);
- _variableSizeDataByteArrayOutputStream.write(bytes);
- */
+ if (_version >= 4) {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+ _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ byte[] bytes = value.getBytes();
+ _currentRowDataByteBuffer.putInt(bytes.length);
+ _variableSizeDataByteArrayOutputStream.write(bytes);
+ } else {
+ // NOTE: Use String to store bytes value in DataTable V2 for backward-compatibility
+ setColumn(colId, value.toHexString());
+ }
}
public void setColumn(int colId, Object value)
@@ -288,9 +300,18 @@ public class DataTableBuilder {
}
public DataTable build() {
- return _version == VERSION_2 ? new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap,
- _fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray())
- : new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap,
+ switch (_version) {
+ case VERSION_2:
+ return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap,
_fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray());
+ case VERSION_3:
+ return new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap,
+ _fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray());
+ case VERSION_4:
+ return new DataTableImplV4(_numRows, _dataSchema, _reverseDictionaryMap,
+ _fixedSizeDataByteArrayOutputStream.toByteArray(), _variableSizeDataByteArrayOutputStream.toByteArray());
+ default:
+ throw new IllegalStateException("Unexpected value: " + _version);
+ }
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
index 54e305edbc..4a39f110d9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
@@ -35,6 +35,8 @@ public class DataTableFactory {
return new DataTableImplV2(byteBuffer);
case DataTableBuilder.VERSION_3:
return new DataTableImplV3(byteBuffer);
+ case DataTableBuilder.VERSION_4:
+ return new DataTableImplV4(byteBuffer);
default:
throw new UnsupportedOperationException("Unsupported data table version: " + version);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
index 1340b45e19..af3665a48e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
@@ -92,7 +92,7 @@ public class DataTableImplV2 extends BaseDataTable {
byteBuffer.position(dataSchemaStart);
_dataSchema = DataSchema.fromBytes(byteBuffer);
_columnOffsets = new int[_dataSchema.size()];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets);
+ _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets, getVersion());
} else {
_dataSchema = null;
_columnOffsets = null;
@@ -122,6 +122,11 @@ public class DataTableImplV2 extends BaseDataTable {
}
}
+ @Override
+ public int getVersion() {
+ return DataTableBuilder.VERSION_2;
+ }
+
private Map<String, String> deserializeMetadata(ByteBuffer buffer)
throws IOException {
int numEntries = buffer.getInt();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
index 7838c362fe..edaf1d5479 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
@@ -128,7 +128,7 @@ public class DataTableImplV3 extends BaseDataTable {
byteBuffer.position(dataSchemaStart);
_dataSchema = DataSchema.fromBytes(byteBuffer);
_columnOffsets = new int[_dataSchema.size()];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets);
+ _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets, getVersion());
} else {
_dataSchema = null;
_columnOffsets = null;
@@ -164,6 +164,11 @@ public class DataTableImplV3 extends BaseDataTable {
}
}
+ @Override
+ public int getVersion() {
+ return DataTableBuilder.VERSION_3;
+ }
+
@Override
public void addException(ProcessingException processingException) {
_errCodeToExceptionMap.put(processingException.getErrorCode(), processingException.getMessage());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
similarity index 53%
copy from pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
copy to pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
index 54e305edbc..fe32ac84a7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
@@ -16,32 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pinot.core.common.datatable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataTable;
+import java.util.Map;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datablock.RowDataBlock;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+
+/**
+ * Datatable V4 Implementation is a wrapper around the Row-based data block.
+ */
+@InterfaceStability.Evolving
+public class DataTableImplV4 extends RowDataBlock {
-public class DataTableFactory {
- private DataTableFactory() {
+ public DataTableImplV4() {
+ super();
}
- public static DataTable getDataTable(ByteBuffer byteBuffer)
+ public DataTableImplV4(ByteBuffer byteBuffer)
throws IOException {
- int version = byteBuffer.getInt();
- switch (version) {
- case DataTableBuilder.VERSION_2:
- return new DataTableImplV2(byteBuffer);
- case DataTableBuilder.VERSION_3:
- return new DataTableImplV3(byteBuffer);
- default:
- throw new UnsupportedOperationException("Unsupported data table version: " + version);
- }
+ super(byteBuffer);
}
- public static DataTable getDataTable(byte[] bytes)
- throws IOException {
- return getDataTable(ByteBuffer.wrap(bytes));
+ public DataTableImplV4(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap,
+ byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+ super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes);
+ }
+
+ @Override
+ protected int getDataBlockVersionType() {
+ return DataTableBuilder.VERSION_4;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
index 684627f8f4..1430869f95 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
@@ -56,7 +56,7 @@ public class DataTableUtils {
* @param columnOffsets array of column offsets.
* @return row size in bytes.
*/
- public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets) {
+ public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets, int dataTableVersion) {
int numColumns = columnOffsets.length;
assert numColumns == dataSchema.size();
@@ -71,10 +71,12 @@ public class DataTableUtils {
case LONG:
rowSizeInBytes += 8;
break;
- // TODO: fix float size (should be 4).
- // For backward compatible, DON'T CHANGE.
case FLOAT:
- rowSizeInBytes += 8;
+ if (dataTableVersion >= 4) {
+ rowSizeInBytes += 4;
+ } else {
+ rowSizeInBytes += 8;
+ }
break;
case DOUBLE:
rowSizeInBytes += 8;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
similarity index 57%
rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index bf33160b92..4ede7f2cd7 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -16,19 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
public class DataBlockTest {
- private static final int TEST_ROW_COUNT = 2;
+ private static final List<DataSchema.ColumnDataType> EXCLUDE_DATA_TYPES = ImmutableList.of(
+ DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.BYTES_ARRAY);
+ private static final int TEST_ROW_COUNT = 5;
@Test
public void testException()
@@ -51,9 +59,46 @@ public class DataBlockTest {
originalException.getMessage());
}
+ /**
+ * This test is only here to ensure that {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl}
+ * producing data table can actually be wrapped and sent via mailbox in the {@link RowDataBlock} format.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowDataBlockCompatibleWithDataTableV4()
+ throws Exception {
+ DataSchema.ColumnDataType[] allDataTypes = DataSchema.ColumnDataType.values();
+ List<DataSchema.ColumnDataType> columnDataTypes = new ArrayList<DataSchema.ColumnDataType>();
+ List<String> columnNames = new ArrayList<String>();
+ for (int i = 0; i < allDataTypes.length; i++) {
+ if (!EXCLUDE_DATA_TYPES.contains(allDataTypes[i])) {
+ columnNames.add(allDataTypes[i].name());
+ columnDataTypes.add(allDataTypes[i]);
+ }
+ }
+
+ DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
+ columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
+ List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT);
+ DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+ DataTable dataTableImpl = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema);
+ DataTable dataBlockFromDataTable = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
+
+ for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
+ Object[] rowFromDataTable = SelectionOperatorUtils.extractRowFromDataTable(dataTableImpl, rowId);
+ Object[] rowFromBlock = SelectionOperatorUtils.extractRowFromDataTable(dataBlockFromDataTable, rowId);
+ for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
+ Assert.assertEquals(rowFromBlock[colId], rowFromDataTable[colId], "Error comparing Row/Column Block "
+ + " at (" + rowId + "," + colId + ") of Type: " + dataSchema.getColumnDataType(colId) + "! "
+ + " from DataBlock: [" + rowFromBlock[rowId] + "], from DataTable: [" + rowFromDataTable[colId] + "]");
+ }
+ }
+ }
+
@Test
public void testAllDataTypes()
- throws IOException {
+ throws Exception {
DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
int numColumns = columnDataTypes.length;
String[] columnNames = new String[numColumns];
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
similarity index 99%
rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java
rename to pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index caa9b55ccf..54f046d13f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.blocks;
+package org.apache.pinot.core.common.datablock;
import java.math.BigDecimal;
import java.util.ArrayList;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index be2df6f749..8dbc3910f6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -163,6 +163,139 @@ public class DataTableSerDeTest {
verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
}
+ @Test
+ public void testV3V4Compatibility()
+ throws IOException {
+ DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values();
+ int numColumns = columnDataTypes.length;
+ String[] columnNames = new String[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ columnNames[i] = columnDataTypes[i].name();
+ }
+
+ DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+ // TODO: verify data table compatibility across multi-stage and normal query engine.
+ // TODO: see https://github.com/apache/pinot/pull/8874/files#r894806085
+
+ // Verify V4 broker can deserialize data table (has data, but has no metadata) send by V3 server
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+ DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
+ DataTableBuilder dataTableBuilderV3WithDataOnly = new DataTableBuilder(dataSchema);
+ fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns);
+
+ DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table
+ DataTable newDataTable =
+ DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+ Assert.assertEquals(newDataTable.getMetadata().size(), 0);
+
+ // Verify V4 broker can deserialize data table (has data and metadata) send by V3 server
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V4 broker can deserialize data table (only has metadata) send by V3 server
+ DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new DataTableBuilder(dataSchema);
+ dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement
+ // disabled)
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+ DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+ DataTableBuilder dataTableBuilderV4WithDataOnly = new DataTableBuilder(dataSchema);
+ fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, columnDataTypes, numColumns);
+ DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V3 data table
+ // Deserialize data table bytes as V4
+ newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+ Assert.assertEquals(newDataTable.getMetadata().size(), 0);
+
+ // Verify V4 broker can deserialize data table (has data and metadata) send by V4 server(with
+ // ThreadCpuTimeMeasurement disabled)
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ // Deserialize data table bytes as V4
+ newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V4 broker can deserialize data table (only has metadata) send by V4 server(with
+ // ThreadCpuTimeMeasurement disabled)
+ DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = new DataTableBuilder(dataSchema);
+ dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a V4 data table
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement
+ // enabled)
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
+ DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+ dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table
+ // Deserialize data table bytes as V4
+ newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+ Assert.assertEquals(newDataTable.getMetadata().size(), 1);
+ Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
+
+ // Verify V4 broker can deserialize data table (has data and metadata) send by V4 server(with
+ // ThreadCpuTimeMeasurement enabled)
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ // Deserialize data table bytes as V3
+ newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+ if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+ Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
+ newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+ }
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V4 broker can deserialize data table (only has metadata) send by V4 server(with
+ // ThreadCpuTimeMeasurement enabled)
+ dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a V4 data table
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+ if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+ Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
+ newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+ }
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+ }
+
@Test
public void testV2V3Compatibility()
throws IOException {
@@ -176,6 +309,7 @@ public class DataTableSerDeTest {
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
// Verify V3 broker can deserialize data table (has data, but has no metadata) send by V2 server
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_2);
DataTableBuilder dataTableBuilderV2WithDataOnly = new DataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly, columnDataTypes, numColumns);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
new file mode 100644
index 0000000000..8bbf28e1ee
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest {
+ private static final String SCHEMA_FILE_NAME =
+ "On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
+
+ @Override
+ protected String getSchemaFileName() {
+ return SCHEMA_FILE_NAME;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createOfflineTableConfig();
+ addTableConfig(tableConfig);
+
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
+
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+
+ // Setting data table version to 4
+ DataTableBuilder.setCurrentDataTableVersion(4);
+ }
+
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
+ brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8421);
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ serverConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
+ serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 8842);
+ serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422);
+ }
+
+ @Test(dataProvider = "multiStageQueryEngineSqlTestSet")
+ public void testMultiStageQuery(String sql, int expectedNumOfRows, int expectedNumOfColumns)
+ throws IOException {
+ JsonNode multiStageResponse = JsonUtils.stringToJsonNode(
+ sendPostRequest(_brokerBaseApiUrl + "/query/sql", "{\"useMultistageEngine\": true, \"sql\":\"" + sql + "\"}"));
+ Assert.assertTrue(multiStageResponse.has("resultTable"));
+ ArrayNode jsonNode = (ArrayNode) multiStageResponse.get("resultTable").get("rows");
+ // TODO: assert actual result data payload.
+ Assert.assertEquals(jsonNode.size(), expectedNumOfRows);
+ Assert.assertEquals(jsonNode.get(0).size(), expectedNumOfColumns);
+ }
+
+ @DataProvider
+ public Object[][] multiStageQueryEngineSqlTestSet() {
+ return new Object[][] {
+ new Object[]{"SELECT * FROM mytable_OFFLINE", 10, 73},
+ new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE WHERE CarrierDelay=15 AND ArrDelay>20", 10, 2},
+ new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.AirlineID = b.AirlineID "
+ + " WHERE a.CarrierDelay=15 AND a.ArrDelay>20 AND b.ArrDelay<20", 10, 146}
+ };
+ }
+
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ // Setting data table version to 4
+ DataTableBuilder.setCurrentDataTableVersion(3);
+
+ dropOfflineTable(DEFAULT_TABLE_NAME);
+
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index e25c6e03b8..cf56c5b786 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -34,8 +36,6 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -101,7 +101,8 @@ public class QueryRunner {
BaseDataBlock dataBlock;
try {
DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null);
- // this works because default DataTableImplV3 will have ordinal 0, which maps to ROW(0)
+ // this works because default DataTableImplV3 will have a version number at beginning,
+ // which maps to ROW type of version 3.
dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
} catch (IOException e) {
throw new RuntimeException("Unable to convert byte buffer", e);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 9284011f06..ae64cc90bb 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -24,6 +24,9 @@ import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.BlockDocIdValueSet;
import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.ColumnarDataBlock;
+import org.apache.pinot.core.common.datablock.RowDataBlock;
/**
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
new file mode 100644
index 0000000000..f2992c41df
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
+
+
+public final class TransferableBlockUtils {
+ private TransferableBlockUtils() {
+ // do not instantiate.
+ }
+ private static final TransferableBlock EOS_TRANSFERABLE_BLOCK =
+ new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+
+ public static TransferableBlock getEndOfStreamTransferableBlock() {
+ return EOS_TRANSFERABLE_BLOCK;
+ }
+
+ public static TransferableBlock getErrorTransferableBlock(Exception e) {
+ return new TransferableBlock(DataBlockUtils.getErrorDataBlock(e));
+ }
+
+ public static TransferableBlock getEmptyTransferableBlock(DataSchema dataSchema) {
+ return new TransferableBlock(DataBlockUtils.getEmptyDataBlock(dataSchema));
+ }
+
+ public static boolean isEndOfStream(TransferableBlock transferableBlock) {
+ return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA)
+ && "END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata()
+ .get(DataTable.MetadataKey.TABLE.getName()));
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index d8ec43e040..01a97643ab 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -36,8 +36,8 @@ import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -88,7 +88,7 @@ public class WorkerQueryExecutor {
@Override
public void runJob() {
ThreadTimer executionThreadTimer = new ThreadTimer();
- while (!DataBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
+ while (!TransferableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
LOGGER.debug("Result Block acquired");
}
LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ff99dae5ed..ae1bf43d4c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -25,14 +25,15 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockBuilder;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
/**
@@ -86,14 +87,14 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
try {
return new TransferableBlock(buildJoinedDataBlock(_leftTableOperator.nextBlock()));
} catch (Exception e) {
- return DataBlockUtils.getErrorTransferableBlock(e);
+ return TransferableBlockUtils.getErrorTransferableBlock(e);
}
}
private void buildBroadcastHashTable() {
if (!_isHashTableBuilt) {
TransferableBlock rightBlock = _rightTableOperator.nextBlock();
- while (!DataBlockUtils.isEndOfStream(rightBlock)) {
+ while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
BaseDataBlock dataBlock = rightBlock.getDataBlock();
_rightTableSchema = dataBlock.getDataSchema();
int numRows = dataBlock.getNumberOfRows();
@@ -112,7 +113,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
private BaseDataBlock buildJoinedDataBlock(TransferableBlock block)
throws Exception {
- if (DataBlockUtils.isEndOfStream(block)) {
+ if (TransferableBlockUtils.isEndOfStream(block)) {
return DataBlockUtils.getEndOfStreamDataBlock();
}
List<Object[]> rows = new ArrayList<>();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 37a203f815..0b8d88cd00 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -25,14 +25,15 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,7 +117,7 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
}
// TODO: we need to at least return one data table with schema if there's no error.
// we need to condition this on whether there's already things being returned or not.
- return DataBlockUtils.getEndOfStreamTransferableBlock();
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
public RelDistribution.Type getExchangeType() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 3681acdc7d..8b5ccd5b99 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -30,6 +30,9 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.common.datablock.DataBlockBuilder;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
@@ -37,10 +40,8 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockBuilder;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,7 +123,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
if (_dataTableBlockBaseOperator != null) {
transferableBlock = _dataTableBlockBaseOperator.nextBlock();
dataTable = transferableBlock.getDataBlock();
- isEndOfStream = DataBlockUtils.isEndOfStream(transferableBlock);
+ isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
} else {
dataTable = _dataTable;
isEndOfStream = true;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 260c083ea7..f0236eb7ff 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -29,14 +29,14 @@ import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.runtime.blocks.BaseDataBlock;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
@@ -117,7 +117,7 @@ public class QueryDispatcher {
TransferableBlock transferableBlock;
while (true) {
transferableBlock = mailboxReceiveOperator.nextBlock();
- if (DataBlockUtils.isEndOfStream(transferableBlock)) {
+ if (TransferableBlockUtils.isEndOfStream(transferableBlock)) {
break;
}
if (transferableBlock.getDataBlock() != null) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index e254778db1..07c69a05ac 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -24,7 +24,7 @@ import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.common.proto.Mailbox;
-import org.apache.pinot.query.runtime.blocks.DataBlockUtils;
+import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index dd405c898d..8bef837793 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -180,8 +180,13 @@ public abstract class BaseServerStarter implements ServiceStartable {
Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
// Set data table version send to broker.
- DataTableBuilder.setCurrentDataTableVersion(_serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION,
- Server.DEFAULT_CURRENT_DATA_TABLE_VERSION));
+ int dataTableVersion =
+ _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
+ if (dataTableVersion > Server.DEFAULT_CURRENT_DATA_TABLE_VERSION) {
+ throw new UnsupportedOperationException("Setting experimental DataTable version newer than default via config "
+ + "is not allowed. Current default DataTable version: " + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
+ }
+ DataTableBuilder.setCurrentDataTableVersion(dataTableVersion);
LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName: {}, instanceId: {}", _zkAddress,
_helixClusterName, _instanceId);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org