You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/05 19:05:40 UTC
[pinot] branch master updated: support partial operator chain execution (#9711)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new aa013a432a support partial operator chain execution (#9711)
aa013a432a is described below
commit aa013a432a3464e658398326a3a98ff045fcfc9a
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Sat Nov 5 12:05:32 2022 -0700
support partial operator chain execution (#9711)
This is the first PR in a series of PRs to improve our execution model. It implements "partial execution" of operator chains by allowing them to return a "noop" `MetadataBlock` in the scenario where there is either no data to process or no data to output.
This PR is a non-functional change because the `WorkerQueryExecutor` doesn't actually take advantage of the partial execution ability - it just calls `operator#nextBlock` whenever it processes a noop block.
---
.../pinot/common/datablock/DataBlockUtils.java | 10 +-
.../pinot/common/datablock/MetadataBlock.java | 110 ++++++++++++++++--
.../pinot/common/datablock/MetadataBlockTest.java | 128 +++++++++++++++++++++
.../pinot/core/common/datablock/DataBlockTest.java | 1 -
.../apache/pinot/query/runtime/QueryRunner.java | 2 +-
.../query/runtime/blocks/TransferableBlock.java | 31 +++--
.../runtime/blocks/TransferableBlockUtils.java | 13 ++-
.../runtime/executor/WorkerQueryExecutor.java | 12 +-
.../query/runtime/operator/AggregateOperator.java | 103 +++++++++--------
.../query/runtime/operator/FilterOperator.java | 24 ++--
.../query/runtime/operator/HashJoinOperator.java | 91 ++++++++-------
.../runtime/operator/LiteralValueOperator.java | 2 +-
.../runtime/operator/MailboxReceiveOperator.java | 64 ++++++-----
.../runtime/operator/MailboxSendOperator.java | 15 ++-
.../pinot/query/runtime/operator/SortOperator.java | 30 +++--
.../query/runtime/operator/TransformOperator.java | 32 +++---
.../apache/pinot/query/service/QueryConfig.java | 2 +-
.../pinot/query/service/QueryDispatcher.java | 31 +++--
.../query/mailbox/InMemoryMailboxServiceTest.java | 2 +-
.../pinot/query/runtime/QueryRunnerTest.java | 2 +-
.../query/runtime/TransferableBlockUtilsTest.java | 2 +-
.../runtime/operator/AggregateOperatorTest.java | 3 +
.../runtime/operator/HashJoinOperatorTest.java | 15 ++-
.../query/runtime/operator/OperatorTestUtil.java | 6 +-
24 files changed, 511 insertions(+), 220 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index f3cf116142..1dc8da3747 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -47,16 +47,20 @@ public final class DataBlockUtils {
}
public static MetadataBlock getErrorDataBlock(Map<Integer, String> exceptions) {
- MetadataBlock errorBlock = new MetadataBlock();
+ MetadataBlock errorBlock = new MetadataBlock(MetadataBlock.MetadataBlockType.ERROR);
for (Map.Entry<Integer, String> exception : exceptions.entrySet()) {
errorBlock.addException(exception.getKey(), exception.getValue());
}
return errorBlock;
}
- public static MetadataBlock getEndOfStreamDataBlock(DataSchema dataSchema) {
+ public static MetadataBlock getEndOfStreamDataBlock() {
// TODO: add query statistics metadata for the block.
- return new MetadataBlock(dataSchema);
+ return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
+ }
+
+ public static MetadataBlock getNoOpBlock() {
+ return new MetadataBlock(MetadataBlock.MetadataBlockType.NOOP);
}
public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
index 8b063e4024..33f7ee965b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java
@@ -18,28 +18,118 @@
*/
package org.apache.pinot.common.datablock;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
/**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
*/
public class MetadataBlock extends BaseDataBlock {
- private static final int VERSION = 1;
- public MetadataBlock() {
- super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+ private static final ObjectMapper JSON = new ObjectMapper();
+
+ @VisibleForTesting
+ static final int VERSION = 1;
+
+ public enum MetadataBlockType {
+ /**
+ * Indicates that this block is the final block to be sent
+ * (End Of Stream) as part of an operator chain computation.
+ */
+ EOS,
+
+ /**
+ * An {@code ERROR} metadata block indicates that there was
+ * some error during computation. To retrieve the error that
+ * occurred, use {@link MetadataBlock#getExceptions()}
+ */
+ ERROR,
+
+ /**
+ * A {@code NOOP} metadata block can be sent at any point to
+ * and should be ignored by downstream - it is often used to
+ * indicate that the operator chain either has nothing to process
+ * or has processed data but is not yet ready to emit a result
+ * block.
+ */
+ NOOP;
+
+ MetadataBlockType() {
+ }
}
- public MetadataBlock(DataSchema dataSchema) {
- super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
+ /**
+ * Used to serialize the contents of the metadata block conveniently and in
+ * a backwards compatible way. Use JSON because the performance of metadata block
+ * SerDe should not be a bottleneck.
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ @VisibleForTesting
+ static class Contents {
+
+ private String _type;
+
+ @JsonCreator
+ public Contents(@JsonProperty("type") String type) {
+ _type = type;
+ }
+
+ @JsonCreator
+ public Contents() {
+ _type = null;
+ }
+
+ public String getType() {
+ return _type;
+ }
+
+ public void setType(String type) {
+ _type = type;
+ }
+ }
+
+ private final Contents _contents;
+
+ public MetadataBlock(MetadataBlockType type) {
+ super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name())));
+ _contents = new Contents(type.name());
+ }
+
+ private static byte[] toContents(Contents type) {
+ try {
+ return JSON.writeValueAsBytes(type);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
}
public MetadataBlock(ByteBuffer byteBuffer)
throws IOException {
super(byteBuffer);
+ if (_variableSizeDataBytes != null) {
+ _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
+ } else {
+ _contents = new Contents();
+ }
+ }
+
+ public MetadataBlockType getType() {
+ String type = _contents.getType();
+
+ // if type is null, then we're reading a legacy block where we didn't encode any
+ // data. assume that it is an EOS block if there's no exceptions and an ERROR block
+ // otherwise
+ return type == null
+ ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
+ : MetadataBlockType.valueOf(type);
}
@Override
@@ -49,12 +139,12 @@ public class MetadataBlock extends BaseDataBlock {
@Override
protected int getOffsetInFixedBuffer(int rowId, int colId) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Metadata block uses JSON encoding for field access");
}
@Override
protected int positionOffsetInVariableBufferAndGetLength(int rowId, int colId) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Metadata block uses JSON encoding for field access");
}
@Override
@@ -64,6 +154,6 @@ public class MetadataBlock extends BaseDataBlock {
@Override
public MetadataBlock toDataOnlyDataTable() {
- return new MetadataBlock(_dataSchema);
+ throw new UnsupportedOperationException();
}
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/datablock/MetadataBlockTest.java b/pinot-common/src/test/java/org/apache/pinot/common/datablock/MetadataBlockTest.java
new file mode 100644
index 0000000000..f350e2c3e6
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/datablock/MetadataBlockTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.common.datatable.DataTable;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+public class MetadataBlockTest {
+
+ @Test
+ public void shouldEncodeContentsAsJSON()
+ throws Exception {
+ // Given:
+ MetadataBlock.MetadataBlockType type = MetadataBlock.MetadataBlockType.EOS;
+
+ // When:
+ MetadataBlock metadataBlock = new MetadataBlock(type);
+
+ // Then:
+ byte[] expected = new ObjectMapper().writeValueAsBytes(new MetadataBlock.Contents("EOS"));
+ assertEquals(metadataBlock._variableSizeDataBytes, expected);
+ }
+
+ @Test
+ public void shouldDefaultToEosWithNoErrorsOnLegacyMetadataBlock()
+ throws IOException {
+ // Given:
+ // MetadataBlock used to be encoded without any data, we should make sure that
+ // during rollout or if server versions are mismatched that we can still handle
+ // the old format
+ OldMetadataBlock legacyBlock = new OldMetadataBlock();
+ byte[] bytes = legacyBlock.toBytes();
+
+ // When:
+ ByteBuffer buff = ByteBuffer.wrap(bytes);
+ buff.getInt(); // consume the version information before decoding
+ MetadataBlock metadataBlock = new MetadataBlock(buff);
+
+ // Then:
+ assertEquals(metadataBlock.getType(), MetadataBlock.MetadataBlockType.EOS);
+ }
+
+ @Test
+ public void shouldDefaultToErrorOnLegacyMetadataBlockWithErrors()
+ throws IOException {
+ // Given:
+ // MetadataBlock used to be encoded without any data, we should make sure that
+ // during rollout or if server versions are mismatched that we can still handle
+ // the old format
+ OldMetadataBlock legacyBlock = new OldMetadataBlock();
+ legacyBlock.addException(250, "timeout");
+ byte[] bytes = legacyBlock.toBytes();
+
+ // When:
+ ByteBuffer buff = ByteBuffer.wrap(bytes);
+ buff.getInt(); // consume the version information before decoding
+ MetadataBlock metadataBlock = new MetadataBlock(buff);
+
+ // Then:
+ assertEquals(metadataBlock.getType(), MetadataBlock.MetadataBlockType.ERROR);
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void shouldThrowExceptionWhenUsingReadMethods() {
+ // Given:
+ MetadataBlock block = new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
+
+ // When:
+ // (should through exception)
+ block.getInt(0, 0);
+ }
+
+ /**
+ * This is mostly just used as an internal serialization tool
+ */
+ private static class OldMetadataBlock extends BaseDataBlock {
+
+ public OldMetadataBlock() {
+ super(0, null, new String[0], new byte[0], new byte[0]);
+ }
+
+ @Override
+ protected int getDataBlockVersionType() {
+ return MetadataBlock.VERSION + (Type.METADATA.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+ }
+
+ @Override
+ protected int getOffsetInFixedBuffer(int rowId, int colId) {
+ return 0;
+ }
+
+ @Override
+ protected int positionOffsetInVariableBufferAndGetLength(int rowId, int colId) {
+ return 0;
+ }
+
+ @Override
+ public DataTable toMetadataOnlyDataTable() {
+ return null;
+ }
+
+ @Override
+ public DataTable toDataOnlyDataTable() {
+ return null;
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index 73c47e1b76..9ff0db3334 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -58,7 +58,6 @@ public class DataBlockTest {
BaseDataBlock dataBlock = DataBlockUtils.getErrorDataBlock(originalException);
dataBlock.addException(processingException);
- Assert.assertNull(dataBlock.getDataSchema());
Assert.assertEquals(dataBlock.getNumberOfRows(), 0);
// Assert processing exception and original exception both matches.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1a9a8cd88c..c409126b4a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -253,7 +253,7 @@ public class QueryRunner {
return new TransferableBlock(_baseDataBlocks.get(_currentIndex++));
} else {
_currentIndex = -1;
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_dataSchema));
+ return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
}
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 93508ffac6..856914b787 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -18,11 +18,11 @@
*/
package org.apache.pinot.query.runtime.blocks;
-import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.pinot.common.datablock.BaseDataBlock;
import org.apache.pinot.common.datablock.ColumnarDataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.datablock.RowDataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Block;
@@ -41,23 +41,15 @@ public class TransferableBlock implements Block {
private final BaseDataBlock.Type _type;
private final DataSchema _dataSchema;
- private final boolean _isErrorBlock;
private final int _numRows;
private BaseDataBlock _dataBlock;
private List<Object[]> _container;
public TransferableBlock(List<Object[]> container, DataSchema dataSchema, BaseDataBlock.Type containerType) {
- this(container, dataSchema, containerType, false);
- }
-
- @VisibleForTesting
- TransferableBlock(List<Object[]> container, DataSchema dataSchema, BaseDataBlock.Type containerType,
- boolean isErrorBlock) {
_container = container;
_dataSchema = dataSchema;
_type = containerType;
- _isErrorBlock = isErrorBlock;
_numRows = _container.size();
}
@@ -66,7 +58,6 @@ public class TransferableBlock implements Block {
_dataSchema = dataBlock.getDataSchema();
_type = dataBlock instanceof ColumnarDataBlock ? BaseDataBlock.Type.COLUMNAR
: dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW : BaseDataBlock.Type.METADATA;
- _isErrorBlock = !_dataBlock.getExceptions().isEmpty();
_numRows = _dataBlock.getNumberOfRows();
}
@@ -149,7 +140,14 @@ public class TransferableBlock implements Block {
* @return whether this block is the end of stream.
*/
public boolean isEndOfStreamBlock() {
- return _type == BaseDataBlock.Type.METADATA;
+ return isType(MetadataBlock.MetadataBlockType.ERROR) || isType(MetadataBlock.MetadataBlockType.EOS);
+ }
+
+ /**
+ * @return whether this block represents a NOOP block
+ */
+ public boolean isNoOpBlock() {
+ return isType(MetadataBlock.MetadataBlockType.NOOP);
}
/**
@@ -158,7 +156,16 @@ public class TransferableBlock implements Block {
* @return true if contains exception.
*/
public boolean isErrorBlock() {
- return _isErrorBlock;
+ return isType(MetadataBlock.MetadataBlockType.ERROR);
+ }
+
+ private boolean isType(MetadataBlock.MetadataBlockType type) {
+ if (_type != BaseDataBlock.Type.METADATA) {
+ return false;
+ }
+
+ MetadataBlock metadata = (MetadataBlock) _dataBlock;
+ return metadata.getType() == type;
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index 2934f74759..caa25e70b3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.pinot.common.datablock.BaseDataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.RowDataBlock;
-import org.apache.pinot.common.utils.DataSchema;
public final class TransferableBlockUtils {
@@ -34,8 +33,12 @@ public final class TransferableBlockUtils {
// do not instantiate.
}
- public static TransferableBlock getEndOfStreamTransferableBlock(DataSchema dataSchema) {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(dataSchema));
+ public static TransferableBlock getEndOfStreamTransferableBlock() {
+ return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+ }
+
+ public static TransferableBlock getNoOpTransferableBlock() {
+ return new TransferableBlock(DataBlockUtils.getNoOpBlock());
}
public static TransferableBlock getErrorTransferableBlock(Exception e) {
@@ -50,6 +53,10 @@ public final class TransferableBlockUtils {
return transferableBlock.isEndOfStreamBlock();
}
+ public static boolean isNoOpBlock(TransferableBlock transferableBlock) {
+ return transferableBlock.isNoOpBlock();
+ }
+
/**
* Split a block into multiple block so that each block size is within maxBlockSize.
* Currently, we only support split for row type dataBlock.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 2436061aa9..18bb2defa5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -79,11 +79,15 @@ public class WorkerQueryExecutor {
executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
- ThreadTimer executionThreadTimer = new ThreadTimer();
- while (!TransferableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
- LOGGER.debug("Result Block acquired");
+ try {
+ ThreadTimer executionThreadTimer = new ThreadTimer();
+ while (!TransferableBlockUtils.isEndOfStream(rootOperator.nextBlock())) {
+ LOGGER.debug("Result Block acquired");
+ }
+ LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
+ } catch (Exception e) {
+ LOGGER.error("Failed to execute query!", e);
}
- LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs());
}
});
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 56f2b9087b..3edd821b07 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -65,7 +65,9 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
private final Map<Key, Object>[] _groupByResultHolders;
private final Map<Key, Object[]> _groupByKeyHolder;
private TransferableBlock _upstreamErrorBlock;
- private boolean _isCumulativeBlockConstructed;
+
+ private boolean _readyToConstruct;
+ private boolean _hasReturnedAggregateBlock;
// TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
// aggCalls has to be a list of FunctionCall and cannot be null
@@ -95,7 +97,8 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
}
_resultSchema = dataSchema;
- _isCumulativeBlockConstructed = false;
+ _readyToConstruct = false;
+ _hasReturnedAggregateBlock = false;
}
private RexExpression toAggregationFunctionOperand(RexExpression rexExpression) {
@@ -119,8 +122,16 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
@Override
protected TransferableBlock getNextBlock() {
try {
- consumeInputBlocks();
- return produceAggregatedBlock();
+ if (!_readyToConstruct) {
+ consumeInputBlocks();
+ return TransferableBlockUtils.getNoOpTransferableBlock();
+ }
+
+ if (!_hasReturnedAggregateBlock) {
+ return produceAggregatedBlock();
+ } else {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
}
@@ -131,58 +142,52 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
}
- if (!_isCumulativeBlockConstructed) {
- List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size());
- for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
- Object[] row = new Object[_aggCalls.size() + _groupSet.size()];
- Object[] keyElements = e.getValue();
- for (int i = 0; i < keyElements.length; i++) {
- row[i] = keyElements[i];
- }
- for (int i = 0; i < _groupByResultHolders.length; i++) {
- row[i + _groupSet.size()] = _groupByResultHolders[i].get(e.getKey());
- }
- rows.add(row);
- }
- _isCumulativeBlockConstructed = true;
- if (rows.size() == 0) {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock(_resultSchema);
- } else {
- return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
+
+ List<Object[]> rows = new ArrayList<>(_groupByKeyHolder.size());
+ for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) {
+ Object[] row = new Object[_aggCalls.size() + _groupSet.size()];
+ Object[] keyElements = e.getValue();
+ System.arraycopy(keyElements, 0, row, 0, keyElements.length);
+ for (int i = 0; i < _groupByResultHolders.length; i++) {
+ row[i + _groupSet.size()] = _groupByResultHolders[i].get(e.getKey());
}
+ rows.add(row);
+ }
+ _hasReturnedAggregateBlock = true;
+ if (rows.size() == 0) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
} else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock(_resultSchema);
+ return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
}
}
private void consumeInputBlocks() {
- if (!_isCumulativeBlockConstructed) {
- TransferableBlock block = _inputOperator.nextBlock();
- while (!TransferableBlockUtils.isEndOfStream(block)) {
- BaseDataBlock dataBlock = block.getDataBlock();
- int numRows = dataBlock.getNumberOfRows();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
- Key key = extraRowKey(row, _groupSet);
- _groupByKeyHolder.put(key, key.getValues());
- for (int i = 0; i < _aggCalls.size(); i++) {
- Object currentRes = _groupByResultHolders[i].get(key);
- // TODO: fix that single agg result (original type) has different type from multiple agg results (double).
- if (currentRes == null) {
- _groupByResultHolders[i].put(key, _aggregationFunctionInputRefs[i] == -1 ? _aggregationFunctionLiterals[i]
- : row[_aggregationFunctionInputRefs[i]]);
- } else {
- _groupByResultHolders[i].put(key, merge(_aggCalls.get(i), currentRes,
- _aggregationFunctionInputRefs[i] == -1 ? _aggregationFunctionLiterals[i]
- : row[_aggregationFunctionInputRefs[i]]));
- }
- }
+ TransferableBlock block = _inputOperator.nextBlock();
+ // setting upstream error block
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock = block;
+ } else if (block.isEndOfStreamBlock()) {
+ _readyToConstruct = true;
+ return;
+ }
+
+ BaseDataBlock dataBlock = block.getDataBlock();
+ int numRows = dataBlock.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+ Key key = extraRowKey(row, _groupSet);
+ _groupByKeyHolder.put(key, key.getValues());
+ for (int i = 0; i < _aggCalls.size(); i++) {
+ Object currentRes = _groupByResultHolders[i].get(key);
+ // TODO: fix that single agg result (original type) has different type from multiple agg results (double).
+ if (currentRes == null) {
+ _groupByResultHolders[i].put(key, _aggregationFunctionInputRefs[i] == -1 ? _aggregationFunctionLiterals[i]
+ : row[_aggregationFunctionInputRefs[i]]);
+ } else {
+ _groupByResultHolders[i].put(key, merge(_aggCalls.get(i), currentRes,
+ _aggregationFunctionInputRefs[i] == -1 ? _aggregationFunctionLiterals[i]
+ : row[_aggregationFunctionInputRefs[i]]));
}
- block = _inputOperator.nextBlock();
- }
- // setting upstream error block
- if (block.isErrorBlock()) {
- _upstreamErrorBlock = block;
}
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index b3aa17ac56..5620cec86f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
@@ -71,21 +70,20 @@ public class FilterOperator extends BaseOperator<TransferableBlock> {
throws Exception {
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
- }
- if (!TransferableBlockUtils.isEndOfStream(block)) {
- List<Object[]> resultRows = new ArrayList<>();
- List<Object[]> container = block.getContainer();
- for (Object[] row : container) {
- if (_filterOperand.apply(row)) {
- resultRows.add(row);
- }
- }
- return new TransferableBlock(resultRows, _dataSchema, BaseDataBlock.Type.ROW);
} else if (block.isErrorBlock()) {
_upstreamErrorBlock = block;
return _upstreamErrorBlock;
- } else {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_dataSchema));
+ } else if (TransferableBlockUtils.isEndOfStream(block) || TransferableBlockUtils.isNoOpBlock(block)) {
+ return block;
+ }
+
+ List<Object[]> resultRows = new ArrayList<>();
+ List<Object[]> container = block.getContainer();
+ for (Object[] row : container) {
+ if (_filterOperand.apply(row)) {
+ resultRows.add(row);
+ }
}
+ return new TransferableBlock(resultRows, _dataSchema, BaseDataBlock.Type.ROW);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 61cd18ff5a..1c3111cbb9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -25,7 +25,6 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.Key;
@@ -99,11 +98,17 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
@Override
protected TransferableBlock getNextBlock() {
- // Build JOIN hash table
- buildBroadcastHashTable();
+ if (!_isHashTableBuilt) {
+ // Build JOIN hash table
+ buildBroadcastHashTable();
+ }
+
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
+ } else if (!_isHashTableBuilt) {
+ return TransferableBlockUtils.getNoOpTransferableBlock();
}
+
// JOIN each left block with the right block.
try {
return buildJoinedDataBlock(_leftTableOperator.nextBlock());
@@ -113,54 +118,58 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
}
private void buildBroadcastHashTable() {
- if (!_isHashTableBuilt) {
- TransferableBlock rightBlock = _rightTableOperator.nextBlock();
- while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
- List<Object[]> container = rightBlock.getContainer();
- // put all the rows into corresponding hash collections keyed by the key selector function.
- for (Object[] row : container) {
- List<Object[]> hashCollection =
- _broadcastHashTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
- hashCollection.add(row);
- }
- rightBlock = _rightTableOperator.nextBlock();
- }
- if (rightBlock.isErrorBlock()) {
- _upstreamErrorBlock = rightBlock;
- }
+ TransferableBlock rightBlock = _rightTableOperator.nextBlock();
+ if (rightBlock.isErrorBlock()) {
+ _upstreamErrorBlock = rightBlock;
+ return;
+ }
+
+ if (TransferableBlockUtils.isEndOfStream(rightBlock)) {
_isHashTableBuilt = true;
+ return;
+ } else if (TransferableBlockUtils.isNoOpBlock(rightBlock)) {
+ return;
+ }
+
+ List<Object[]> container = rightBlock.getContainer();
+ // put all the rows into corresponding hash collections keyed by the key selector function.
+ for (Object[] row : container) {
+ List<Object[]> hashCollection =
+ _broadcastHashTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
+ hashCollection.add(row);
}
}
private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock)
throws Exception {
- if (!TransferableBlockUtils.isEndOfStream(leftBlock)) {
- List<Object[]> rows = new ArrayList<>();
- List<Object[]> container = leftBlock.getContainer();
- for (Object[] leftRow : container) {
- List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
- new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
- // If it is a left join and right table is empty, we return left rows.
- if (hashCollection.isEmpty() && _joinType == JoinRelType.LEFT) {
- rows.add(joinRow(leftRow, null));
- } else {
- // If it is other type of join.
- for (Object[] rightRow : hashCollection) {
- Object[] resultRow = joinRow(leftRow, rightRow);
- if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
- evaluator -> evaluator.apply(resultRow))) {
- rows.add(resultRow);
- }
+ if (leftBlock.isErrorBlock()) {
+ _upstreamErrorBlock = leftBlock;
+ return _upstreamErrorBlock;
+ } else if (TransferableBlockUtils.isEndOfStream(leftBlock) || TransferableBlockUtils.isNoOpBlock(leftBlock)) {
+ return leftBlock;
+ }
+
+ List<Object[]> rows = new ArrayList<>();
+ List<Object[]> container = leftBlock.getContainer();
+ for (Object[] leftRow : container) {
+ List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
+ new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
+ // If it is a left join and right table is empty, we return left rows.
+ if (hashCollection.isEmpty() && _joinType == JoinRelType.LEFT) {
+ rows.add(joinRow(leftRow, null));
+ } else {
+ // If it is other type of join.
+ for (Object[] rightRow : hashCollection) {
+ Object[] resultRow = joinRow(leftRow, rightRow);
+ if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
+ evaluator -> evaluator.apply(resultRow))) {
+ rows.add(resultRow);
}
}
}
- return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
- } else if (leftBlock.isErrorBlock()) {
- _upstreamErrorBlock = leftBlock;
- return _upstreamErrorBlock;
- } else {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_resultSchema));
}
+
+ return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
}
private Object[] joinRow(Object[] leftRow, @Nullable Object[] rightRow) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 9ad6c6fb22..223ab4a04a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -61,7 +61,7 @@ public class LiteralValueOperator extends BaseOperator<TransferableBlock> {
_isLiteralBlockReturned = true;
return _rexLiteralBlock;
} else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 781eaccc2c..d45d5bceb1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -58,6 +58,8 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
private final long _jobId;
private final int _stageId;
private final long _timeout;
+
+ private int _serverIdx;
private TransferableBlock _upstreamErrorBlock;
public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
@@ -90,9 +92,10 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
_port = port;
_jobId = jobId;
_stageId = stageId;
- _timeout = QueryConfig.DEFAULT_TIMEOUT_NANO;
+ _timeout = System.nanoTime() + QueryConfig.DEFAULT_TIMEOUT_NANO;
_upstreamErrorBlock = null;
_keySelector = keySelector;
+ _serverIdx = 0;
}
@Override
@@ -111,37 +114,46 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
protected TransferableBlock getNextBlock() {
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
+ } else if (System.nanoTime() >= _timeout) {
+ LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
+ return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
}
- // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data.
- boolean hasOpenedMailbox = true;
- long timeoutWatermark = System.nanoTime() + _timeout;
- while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
- hasOpenedMailbox = false;
- for (ServerInstance sendingInstance : _sendingStageInstances) {
- try {
- ReceivingMailbox<TransferableBlock> receivingMailbox =
- _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
- // TODO this is not threadsafe.
- // make sure only one thread is checking receiving mailbox and calling receive() then close()
- if (!receivingMailbox.isClosed()) {
- hasOpenedMailbox = true;
- TransferableBlock transferableBlock = receivingMailbox.receive();
- if (transferableBlock != null && !transferableBlock.isEndOfStreamBlock()) {
- // Return the block only if it has some valid data
- return transferableBlock;
+
+ int startingIdx = _serverIdx;
+ int openMailboxCount = 0;
+ int eosCount = 0;
+
+ for (int i = 0; i < _sendingStageInstances.size(); i++) {
+ // this implements a round-robin mailbox iterator so we don't starve any mailboxes
+ _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
+
+ ServerInstance server = _sendingStageInstances.get(_serverIdx);
+ try {
+ ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server));
+ if (!mailbox.isClosed()) {
+ openMailboxCount++;
+
+ // this is blocking for 100ms and may return null
+ TransferableBlock block = mailbox.receive();
+ if (block != null) {
+ if (!block.isEndOfStreamBlock()) {
+ return block;
}
+ eosCount++;
}
- } catch (Exception e) {
- LOGGER.error(String.format("Error receiving data from mailbox %s", sendingInstance), e);
}
+ } catch (Exception e) {
+ LOGGER.error(String.format("Error receiving data from mailbox %s", server), e);
}
}
- if (System.nanoTime() >= timeoutWatermark) {
- LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
- return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
- } else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
- }
+
+ // if we opened at least one mailbox, but still got to this point, then that means
+ // all the mailboxes we opened returned null but were not yet closed - early terminate
+ // with a noop block. Otherwise, we have exhausted all data from all mailboxes and can
+ // return EOS
+ return openMailboxCount > 0
+ ? TransferableBlockUtils.getNoOpTransferableBlock()
+ : TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
public RelDistribution.Type getExchangeType() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index a53864ab56..a26fa19626 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -114,7 +114,20 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
@Override
protected TransferableBlock getNextBlock() {
- TransferableBlock transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+ TransferableBlock transferableBlock;
+ try {
+ transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+ } catch (final Exception e) {
+ // ideally, MailboxSendOperator doesn't ever throw an exception because
+ // it will just get swallowed, in this scenario at least we can forward
+ // any upstream exceptions as an error block
+ transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+ }
+
+ if (TransferableBlockUtils.isNoOpBlock(transferableBlock)) {
+ return transferableBlock;
+ }
+
boolean isEndOfStream = TransferableBlockUtils.isEndOfStream(transferableBlock);
BaseDataBlock.Type type = transferableBlock.getType();
try {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 1acb0c9a69..e710122d81 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -44,6 +44,7 @@ public class SortOperator extends BaseOperator<TransferableBlock> {
private final PriorityQueue<Object[]> _rows;
private final int _numRowsToKeep;
+ private boolean _readyToConstruct;
private boolean _isSortedBlockConstructed;
private TransferableBlock _upstreamErrorBlock;
@@ -87,7 +88,10 @@ public class SortOperator extends BaseOperator<TransferableBlock> {
throws IOException {
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
+ } else if (!_readyToConstruct) {
+ return TransferableBlockUtils.getNoOpTransferableBlock();
}
+
if (!_isSortedBlockConstructed) {
LinkedList<Object[]> rows = new LinkedList<>();
while (_rows.size() > _offset) {
@@ -96,30 +100,34 @@ public class SortOperator extends BaseOperator<TransferableBlock> {
}
_isSortedBlockConstructed = true;
if (rows.size() == 0) {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
} else {
return new TransferableBlock(rows, _dataSchema, BaseDataBlock.Type.ROW);
}
} else {
- return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
}
private void consumeInputBlocks() {
if (!_isSortedBlockConstructed) {
TransferableBlock block = _upstreamOperator.nextBlock();
- while (!TransferableBlockUtils.isEndOfStream(block)) {
- BaseDataBlock dataBlock = block.getDataBlock();
- int numRows = dataBlock.getNumberOfRows();
- for (int rowId = 0; rowId < numRows; rowId++) {
- Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
- SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
- }
- block = _upstreamOperator.nextBlock();
- }
// setting upstream error block
if (block.isErrorBlock()) {
_upstreamErrorBlock = block;
+ return;
+ } else if (TransferableBlockUtils.isEndOfStream(block)) {
+ _readyToConstruct = true;
+ return;
+ } else if (TransferableBlockUtils.isNoOpBlock(block)) {
+ return;
+ }
+
+ BaseDataBlock dataBlock = block.getDataBlock();
+ int numRows = dataBlock.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+ SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
}
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 90efc377ab..a530f0bc88 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
@@ -77,25 +76,26 @@ public class TransformOperator extends BaseOperator<TransferableBlock> {
private TransferableBlock transform(TransferableBlock block)
throws Exception {
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock = block;
+ }
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
}
- if (!TransferableBlockUtils.isEndOfStream(block)) {
- List<Object[]> resultRows = new ArrayList<>();
- List<Object[]> container = block.getContainer();
- for (Object[] row : container) {
- Object[] resultRow = new Object[_resultColumnSize];
- for (int i = 0; i < _resultColumnSize; i++) {
- resultRow[i] = _transformOperandsList.get(i).apply(row);
- }
- resultRows.add(resultRow);
+
+ if (TransferableBlockUtils.isEndOfStream(block) || TransferableBlockUtils.isNoOpBlock(block)) {
+ return block;
+ }
+
+ List<Object[]> resultRows = new ArrayList<>();
+ List<Object[]> container = block.getContainer();
+ for (Object[] row : container) {
+ Object[] resultRow = new Object[_resultColumnSize];
+ for (int i = 0; i < _resultColumnSize; i++) {
+ resultRow[i] = _transformOperandsList.get(i).apply(row);
}
- return new TransferableBlock(resultRows, _resultSchema, BaseDataBlock.Type.ROW);
- } else if (block.isErrorBlock()) {
- _upstreamErrorBlock = block;
- return _upstreamErrorBlock;
- } else {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_resultSchema));
+ resultRows.add(resultRow);
}
+ return new TransferableBlock(resultRows, _resultSchema, BaseDataBlock.Type.ROW);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index 76e10e92dc..3b357466e0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -22,7 +22,7 @@ package org.apache.pinot.query.service;
* Configuration for setting up query runtime.
*/
public class QueryConfig {
- public static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
+ public static final long DEFAULT_TIMEOUT_NANO = 100_000_000_000L;
public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = "pinot.query.runner.max.msg.size.bytes";
public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = 16 * 1024 * 1024;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index b29700dde0..229ede0c16 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -22,14 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.util.Pair;
-import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
@@ -75,7 +72,8 @@ public class QueryDispatcher {
requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), mailboxService.getHostname(),
mailboxService.getMailboxPort());
List<DataTable> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
- return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields());
+ return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
+ queryPlan.getQueryStageMap().get(0).getDataSchema());
}
public int submit(long requestId, QueryPlan queryPlan)
@@ -137,26 +135,23 @@ public class QueryDispatcher {
throw new RuntimeException("Received error query execution result block: "
+ transferableBlock.getDataBlock().getExceptions());
}
- if (transferableBlock.getDataBlock() != null) {
- BaseDataBlock dataTable = transferableBlock.getDataBlock();
- resultDataBlocks.add(dataTable);
+ if (transferableBlock.isNoOpBlock()) {
+ continue;
+ } else if (transferableBlock.isEndOfStreamBlock()) {
+ return resultDataBlocks;
}
- if (transferableBlock.isEndOfStreamBlock()) {
- break;
- }
- }
- if (System.nanoTime() >= timeoutWatermark) {
- resultDataBlocks = Collections.singletonList(
- DataBlockUtils.getErrorDataBlock(QueryException.EXECUTION_TIMEOUT_ERROR));
+
+ resultDataBlocks.add(transferableBlock.getDataBlock());
}
- return resultDataBlocks;
+
+ throw new RuntimeException("Timed out while receiving from mailbox: " + QueryException.EXECUTION_TIMEOUT_ERROR);
}
- public static ResultTable toResultTable(List<DataTable> queryResult, List<Pair<Integer, String>> fields) {
- DataSchema resultSchema = null;
+ public static ResultTable toResultTable(List<DataTable> queryResult, List<Pair<Integer, String>> fields,
+ DataSchema sourceSchema) {
List<Object[]> resultRows = new ArrayList<>();
+ DataSchema resultSchema = toResultSchema(sourceSchema, fields);
for (DataTable dataTable : queryResult) {
- resultSchema = resultSchema == null ? toResultSchema(dataTable.getDataSchema(), fields) : resultSchema;
int numColumns = resultSchema.getColumnNames().length;
int numRows = dataTable.getNumberOfRows();
DataSchema.ColumnDataType[] resultColumnDataTypes = resultSchema.getColumnDataTypes();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index 6f41434e03..a51b9c8c0c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -95,7 +95,7 @@ public class InMemoryMailboxServiceTest {
private TransferableBlock getTestTransferableBlock(int index, boolean isEndOfStream) {
if (isEndOfStream) {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(TEST_DATA_SCHEMA));
+ return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
}
List<Object[]> rows = new ArrayList<>(index);
rows.add(new Object[]{index, "test_data"});
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 3c985d77aa..b614b49b7f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -85,7 +85,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
}
Preconditions.checkNotNull(mailboxReceiveOperator);
return QueryDispatcher.toResultTable(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator),
- queryPlan.getQueryResultFields()).getRows();
+ queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
}
private List<Object[]> queryH2(String sql)
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
index 4fa2c8aa9c..0f9ac7fdf8 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/TransferableBlockUtilsTest.java
@@ -88,7 +88,7 @@ public class TransferableBlockUtilsTest {
validateNonSplittableBlock(columnarBlock);
// METADATA
- MetadataBlock metadataBlock = new MetadataBlock();
+ MetadataBlock metadataBlock = new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
validateNonSplittableBlock(metadataBlock);
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 97e27e10ab..d467ba8112 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -56,6 +56,9 @@ public class AggregateOperatorTest {
new AggregateOperator(_upstreamOperator, OperatorTestUtil.TEST_DATA_SCHEMA, Arrays.asList(agg),
Arrays.asList(new RexExpression.InputRef(1)));
TransferableBlock result = sum0GroupBy1.getNextBlock();
+ while (result.isNoOpBlock()) {
+ result = sum0GroupBy1.getNextBlock();
+ }
List<Object[]> resultRows = result.getContainer();
List<Object[]> expectedRows = Arrays.asList(new Object[]{"Aa", 1}, new Object[]{"BB", 5.0});
Assert.assertEquals(resultRows.size(), expectedRows.size());
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 768371636d..9975cd57ff 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -71,7 +71,10 @@ public class HashJoinOperatorTest {
HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
- TransferableBlock result = join.getNextBlock();
+ TransferableBlock result = join.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = join.nextBlock();
+ }
List<Object[]> resultRows = result.getContainer();
List<Object[]> expectedRows =
Arrays.asList(new Object[]{1, "Aa", 1, "Aa"}, new Object[]{2, "BB", 2, "BB"}, new Object[]{2, "BB", 3, "BB"},
@@ -101,7 +104,10 @@ public class HashJoinOperatorTest {
HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
- TransferableBlock result = join.getNextBlock();
+ TransferableBlock result = join.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = join.nextBlock();
+ }
List<Object[]> resultRows = result.getContainer();
Object[] expRow = new Object[]{1, "Aa", 2, "Aa"};
List<Object[]> expectedRows = new ArrayList<>();
@@ -127,7 +133,10 @@ public class HashJoinOperatorTest {
HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.LEFT);
- TransferableBlock result = join.getNextBlock();
+ TransferableBlock result = join.nextBlock();
+ while (result.isNoOpBlock()) {
+ result = join.nextBlock();
+ }
List<Object[]> resultRows = result.getContainer();
List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "BB", null, null},
new Object[]{3, "BB", null, null});
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index da8a9cc599..aaa190a3d4 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -33,11 +33,11 @@ public class OperatorTestUtil {
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
public static TransferableBlock getEndOfStreamRowBlock() {
- return getEndOfStreamRowBlockWithSchema(TEST_DATA_SCHEMA);
+ return getEndOfStreamRowBlockWithSchema();
}
- public static TransferableBlock getEndOfStreamRowBlockWithSchema(DataSchema schema) {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(schema));
+ public static TransferableBlock getEndOfStreamRowBlockWithSchema() {
+ return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
}
public static TransferableBlock getRowDataBlock(List<Object[]> rows) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org