You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/06 22:38:38 UTC
(pinot) branch master updated: [bugfix][multistage] explicit warning flags set on each stage stats (#11936)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d177866527 [bugfix][multistage] explicit warning flags set on each stage stats (#11936)
d177866527 is described below
commit d177866527e3af3b8cdd350ac6b0762edc99b43a
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon Nov 6 14:38:32 2023 -0800
[bugfix][multistage] explicit warning flags set on each stage stats (#11936)
* set explicit warning flags on each stage
* put a top-level flag for maxRowsInJoinReached b/c the UI depends on the top-level flag only
* simplify the logic in early termination for HashJoinOperator per comments
* set partial result flag in multi-stage handler
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../requesthandler/BaseBrokerRequestHandler.java | 8 +++-
.../MultiStageBrokerRequestHandler.java | 3 ++
.../apache/pinot/common/datatable/DataTable.java | 5 ++-
.../pinot/common/response/BrokerResponse.java | 5 +++
.../response/broker/BrokerResponseNative.java | 21 +++++++--
.../response/broker/BrokerResponseNativeV2.java | 11 ++---
.../response/broker/BrokerResponseStats.java | 14 +++---
.../query/reduce/ExecutionStatsAggregator.java | 6 +++
.../query/runtime/operator/AggregateOperator.java | 8 ++--
.../query/runtime/operator/HashJoinOperator.java | 32 ++++++--------
.../runtime/operator/AggregateOperatorTest.java | 50 ++++++++++++++++++++++
.../runtime/operator/HashJoinOperatorTest.java | 16 +++++--
12 files changed, 133 insertions(+), 46 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index eed9ae56d4..bcc5049ff0 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -775,8 +775,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
1);
}
- brokerResponse.setPartialResult(
- brokerResponse.isNumGroupsLimitReached() || brokerResponse.getExceptionsSize() > 0);
+ brokerResponse.setPartialResult(isPartialResult(brokerResponse));
// Set total query processing time
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
@@ -1821,6 +1820,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
RequestContext requestContext)
throws Exception;
+ protected static boolean isPartialResult(BrokerResponse brokerResponse) {
+ return brokerResponse.isNumGroupsLimitReached() || brokerResponse.isMaxRowsInJoinReached()
+ || brokerResponse.getExceptionsSize() > 0;
+ }
+
protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setTotalDocs(response.getTotalDocs());
statistics.setNumDocsScanned(response.getNumDocsScanned());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index f3cbaffb1b..d329a99f69 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -238,6 +238,9 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
brokerResponse.addStageStat(entry.getKey(), brokerResponseStats);
}
+ // Set partial result flag
+ brokerResponse.setPartialResult(isPartialResult(brokerResponse));
+
// Set total query processing time
// TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 928834359f..72121611eb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -134,11 +134,12 @@ public interface DataTable {
OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs", MetadataValueType.LONG),
OPERATOR_ID(31, "operatorId", MetadataValueType.STRING),
OPERATOR_EXEC_START_TIME_MS(32, "operatorExecStartTimeMs", MetadataValueType.LONG),
- OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", MetadataValueType.LONG);
+ OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs", MetadataValueType.LONG),
+ MAX_ROWS_IN_JOIN_REACHED(34, "maxRowsInJoinReached", MetadataValueType.STRING);
// We keep this constant to track the max id added so far for backward compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
- private static final int MAX_ID = 33;
+ private static final int MAX_ID = 34;
private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 89fcc8c04b..06f65fc2aa 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -133,6 +133,11 @@ public interface BrokerResponse {
*/
boolean isNumGroupsLimitReached();
+ /**
+ * Returns whether the limit for max rows in join has been reached.
+ */
+ boolean isMaxRowsInJoinReached();
+
/**
* Get number of exceptions recorded in the response.
*/
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 83887d0a3b..9fe098e26d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -44,10 +44,12 @@ import org.apache.pinot.spi.utils.JsonUtils;
"resultTable", "requestId", "brokerId", "exceptions", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
- "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
- "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
- "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
- "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "segmentStatistics", "traceInfo", "partialResult"})
+ "numEntriesScannedPostFilter", "numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs",
+ "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
+ "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
+ "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "brokerReduceTimeMs",
+ "segmentStatistics", "traceInfo", "partialResult"
+})
public class BrokerResponseNative implements BrokerResponse {
public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty();
public static final BrokerResponseNative NO_TABLE_RESULT =
@@ -76,6 +78,7 @@ public class BrokerResponseNative implements BrokerResponse {
private long _totalDocs = 0L;
private boolean _numGroupsLimitReached = false;
+ private boolean _maxRowsInJoinReached = false;
private boolean _partialResult = false;
private long _timeUsedMs = 0L;
private long _offlineThreadCpuTimeNs = 0L;
@@ -494,6 +497,16 @@ public class BrokerResponseNative implements BrokerResponse {
_numGroupsLimitReached = numGroupsLimitReached;
}
+ @JsonProperty("maxRowsInJoinReached")
+ public boolean isMaxRowsInJoinReached() {
+ return _maxRowsInJoinReached;
+ }
+
+ @JsonProperty("maxRowsInJoinReached")
+ public void setMaxRowsInJoinReached(boolean maxRowsInJoinReached) {
+ _maxRowsInJoinReached = maxRowsInJoinReached;
+ }
+
@JsonProperty("partialResult")
public boolean isPartialResult() {
return _partialResult;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 95943881dc..60a22460f3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -37,13 +37,14 @@ import org.apache.pinot.spi.utils.JsonUtils;
* Supports serialization via JSON.
*/
@JsonPropertyOrder({
- "resultTable", "requestId", "stageStats", "exceptions", "numServersQueried", "numServersResponded",
+ "resultTable", "requestId", "stageStats", "brokerId", "exceptions", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
- "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
- "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
- "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
- "realtimeTotalCpuTimeNs", "segmentStatistics", "traceInfo"
+ "numEntriesScannedPostFilter", "numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs",
+ "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
+ "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
+ "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "brokerReduceTimeMs",
+ "segmentStatistics", "traceInfo", "partialResult"
})
public class BrokerResponseNativeV2 extends BrokerResponseNative {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index db23034efc..3acc9f3349 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -33,14 +33,16 @@ import org.apache.pinot.spi.utils.JsonUtils;
// same metadataKey
// TODO: Replace member fields with a simple map of <MetadataKey, Object>
// TODO: Add a subStat field, stage level subStats will contain each operator stats
-@JsonPropertyOrder({"brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs",
- "stageExecutionUnit", "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded",
- "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
- "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
- "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
+@JsonPropertyOrder({
+ "brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
+ "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried",
+ "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
+ "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+ "numGroupsLimitReached", "maxRowsInJoinReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
- "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"})
+ "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"
+})
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class BrokerResponseStats extends BrokerResponseNative {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index bc8612c551..c05644af5f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -74,6 +74,7 @@ public class ExecutionStatsAggregator {
private long _explainPlanNumEmptyFilterSegments = 0L;
private long _explainPlanNumMatchAllFilterSegments = 0L;
private boolean _numGroupsLimitReached = false;
+ private boolean _maxRowsInJoinReached = false;
private int _numBlocks = 0;
private int _numRows = 0;
private long _stageExecutionTimeMs = 0;
@@ -250,6 +251,8 @@ public class ExecutionStatsAggregator {
_numGroupsLimitReached |=
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+ _maxRowsInJoinReached |=
+ Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.MAX_ROWS_IN_JOIN_REACHED.getName()));
String numBlocksString = metadata.get(DataTable.MetadataKey.NUM_BLOCKS.getName());
if (numBlocksString != null) {
@@ -306,6 +309,7 @@ public class ExecutionStatsAggregator {
brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
brokerResponseNative.setTotalDocs(_numTotalDocs);
brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
+ brokerResponseNative.setMaxRowsInJoinReached(_maxRowsInJoinReached);
brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
@@ -369,6 +373,8 @@ public class ExecutionStatsAggregator {
brokerResponseStats.setNumBlocks(_numBlocks);
brokerResponseStats.setNumRows(_numRows);
+ brokerResponseStats.setMaxRowsInJoinReached(_maxRowsInJoinReached);
+ brokerResponseStats.setNumGroupsLimitReached(_numGroupsLimitReached);
brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
brokerResponseStats.setStageExecutionUnit(_stageExecutionUnit);
brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 27b6cbdf08..5cb825c149 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.common.datablock.DataBlock;
-import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
@@ -160,9 +160,9 @@ public class AggregateOperator extends MultiStageOperator {
} else {
TransferableBlock dataBlock = new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
if (_groupByExecutor.isNumGroupsLimitReached()) {
- dataBlock.addException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE,
- String.format("Reached numGroupsLimit of: %d for group-by, ignoring the extra groups",
- _groupByExecutor.getNumGroupsLimit()));
+ OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
+ operatorStats.recordSingleStat(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true");
+ _inputOperator.earlyTerminate();
}
return dataBlock;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 295e0c49af..c508b373c5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
@@ -110,7 +111,6 @@ public class HashJoinOperator extends MultiStageOperator {
private final JoinOverFlowMode _joinOverflowMode;
private int _currentRowsInHashTable = 0;
- private ProcessingException _resourceLimitExceededException = null;
public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftTableOperator,
MultiStageOperator rightTableOperator, DataSchema leftSchema, JoinNode node) {
@@ -191,7 +191,7 @@ public class HashJoinOperator extends MultiStageOperator {
protected TransferableBlock getNextBlock() {
try {
if (_isTerminated) {
- return setPartialResultExceptionToBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
if (!_isHashTableBuilt) {
// Build JOIN hash table
@@ -202,7 +202,7 @@ public class HashJoinOperator extends MultiStageOperator {
}
TransferableBlock leftBlock = _leftTableOperator.nextBlock();
// JOIN each left block with the right block.
- return setPartialResultExceptionToBlock(buildJoinedDataBlock(leftBlock));
+ return buildJoinedDataBlock(leftBlock);
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
}
@@ -215,16 +215,21 @@ public class HashJoinOperator extends MultiStageOperator {
List<Object[]> container = rightBlock.getContainer();
// Row based overflow check.
if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) {
- _resourceLimitExceededException =
- new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
- _resourceLimitExceededException.setMessage(
- "Cannot build in memory hash table for join operator, reach number of rows limit: " + _maxRowsInHashTable);
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
- throw _resourceLimitExceededException;
+ ProcessingException resourceLimitExceededException =
+ new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+ resourceLimitExceededException.setMessage(
+ "Cannot build in memory hash table for join operator, reach number of rows limit: "
+ + _maxRowsInHashTable);
+ throw resourceLimitExceededException;
} else {
// Just fill up the buffer.
int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable;
container = container.subList(0, remainingRows);
+ OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
+ operatorStats.recordSingleStat(DataTable.MetadataKey.MAX_ROWS_IN_JOIN_REACHED.getName(), "true");
+ // setting only the rightTableOperator to be early terminated and awaits EOS block next.
+ _rightTableOperator.earlyTerminate();
}
}
// put all the rows into corresponding hash collections keyed by the key selector function.
@@ -238,10 +243,6 @@ public class HashJoinOperator extends MultiStageOperator {
hashCollection.add(row);
}
_currentRowsInHashTable += container.size();
- if (_currentRowsInHashTable == _maxRowsInHashTable) {
- // setting only the rightTableOperator to be early terminated and awaits EOS block next.
- _rightTableOperator.earlyTerminate();
- }
rightBlock = _rightTableOperator.nextBlock();
}
if (rightBlock.isErrorBlock()) {
@@ -300,13 +301,6 @@ public class HashJoinOperator extends MultiStageOperator {
return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
}
- private TransferableBlock setPartialResultExceptionToBlock(TransferableBlock block) {
- if (_resourceLimitExceededException != null) {
- block.addException(_resourceLimitExceededException);
- }
- return block;
- }
-
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 0ad3276d26..93b65dad7f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -18,17 +18,25 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.AggregateNode.AggType;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -65,6 +73,12 @@ public class AggregateOperatorTest {
_mocks.close();
}
+ private static AbstractPlanNode.NodeHint getAggHints(Map<String, String> hintsMap) {
+ RelHint.Builder relHintBuilder = RelHint.builder(PinotHintOptions.AGGREGATE_HINT_OPTIONS);
+ hintsMap.forEach(relHintBuilder::hintOption);
+ return new AbstractPlanNode.NodeHint(ImmutableList.of(relHintBuilder.build()));
+ }
+
@Test
public void shouldHandleUpstreamErrorBlocks() {
// Given:
@@ -256,6 +270,42 @@ public class AggregateOperatorTest {
"expected it to fail with class cast exception");
}
+ @Test
+ public void shouldHandleGroupLimitExceed() {
+ // Given:
+ List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
+ List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
+
+ DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, DOUBLE});
+ Mockito.when(_input.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1.0}, new Object[]{3, 2.0}))
+ .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{3, 3.0}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ DataSchema outSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE});
+ OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
+ Map<String, String> hintsMap = ImmutableMap.of(PinotHintOptions.AggregateOptions.NUM_GROUPS_LIMIT, "1");
+ AggregateOperator operator =
+ new AggregateOperator(context, _input, outSchema, calls, group, AggType.DIRECT, Collections.singletonList(-1),
+ getAggHints(hintsMap));
+
+ // When:
+ TransferableBlock block1 = operator.nextBlock();
+ TransferableBlock block2 = operator.nextBlock();
+
+ // Then
+ Mockito.verify(_input).earlyTerminate();
+
+ // Then:
+ Assert.assertTrue(block1.getNumRows() == 1, "when group limit reach it should only return that many groups");
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "Second block is EOS (done processing)");
+ String operatorId =
+ Joiner.on("_").join(AggregateOperator.class.getSimpleName(), context.getStageId(), context.getServer());
+ OperatorStats operatorStats = context.getStats().getOperatorStats(context, operatorId);
+ Assert.assertEquals(operatorStats.getExecutionStats().get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()),
+ "true");
+ }
+
private static RexExpression.FunctionCall getSum(RexExpression arg) {
return new RexExpression.FunctionCall(SqlKind.SUM, ColumnDataType.INT, "SUM", ImmutableList.of(arg));
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 88d9cf4fe0..fa99ce0c01 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
@@ -29,6 +30,7 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -37,6 +39,7 @@ import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -642,15 +645,20 @@ public class HashJoinOperatorTest {
PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "1");
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, getJoinHints(hintsMap));
- HashJoinOperator join =
- new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node);
+
+ OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
+ HashJoinOperator join = new HashJoinOperator(context, _leftOperator, _rightOperator, leftSchema, node);
TransferableBlock result = join.nextBlock();
Mockito.verify(_rightOperator).earlyTerminate();
Assert.assertFalse(result.isErrorBlock());
Assert.assertEquals(result.getNumRows(), 1);
- Assert.assertTrue(result.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
- .contains("reach number of rows limit"));
+
+ String operatorId =
+ Joiner.on("_").join(HashJoinOperator.class.getSimpleName(), context.getStageId(), context.getServer());
+ OperatorStats operatorStats = context.getStats().getOperatorStats(context, operatorId);
+ Assert.assertEquals(
+ operatorStats.getExecutionStats().get(DataTable.MetadataKey.MAX_ROWS_IN_JOIN_REACHED.getName()), "true");
}
}
// TODO: Add more inequi join tests.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org