You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/02/22 04:50:28 UTC
[pinot] branch master updated: Remove duplicate stats aggregator from V2 query metrics (#10299)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0722bf077b Remove duplicate stats aggregator from V2 query metrics (#10299)
0722bf077b is described below
commit 0722bf077b2d23783ea463a4f615847057e7dbbf
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Feb 22 10:20:22 2023 +0530
Remove duplicate stats aggregator from V2 query metrics (#10299)
* Remove duplicate stats aggregator from V2 query metrics
* Add multistage specific operator stats to metadata
* add test for metadata
Add tests for metadata, refactor the Operator interfaces to accept serverId
* Rename wallTime to executionTime for correct nomenclature in metrics
* Add serverAddress as a seperate param in operatorStats as well
* Remove redundant constructor without server addresses
* Merge metadata test into resource based queries test
* fix broken SortOperatorTest
---------
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
.../MultiStageBrokerRequestHandler.java | 53 +---
.../pinot/core/query/reduce/BaseReduceService.java | 249 ------------------
.../query/reduce/ExecutionStatsAggregator.java | 289 +++++++++++++++++++++
.../query/reduce/StreamingReduceServiceTest.java | 4 +-
.../apache/pinot/query/runtime/QueryRunner.java | 5 +-
.../query/runtime/operator/AggregateOperator.java | 11 +-
.../query/runtime/operator/FilterOperator.java | 5 +-
.../query/runtime/operator/HashJoinOperator.java | 5 +-
.../LeafStageTransferableBlockOperator.java | 7 +-
.../runtime/operator/LiteralValueOperator.java | 5 +-
.../runtime/operator/MailboxReceiveOperator.java | 3 +-
.../runtime/operator/MailboxSendOperator.java | 6 +-
.../query/runtime/operator/MultiStageOperator.java | 13 +-
.../query/runtime/operator/OperatorStats.java | 34 ++-
.../pinot/query/runtime/operator/SortOperator.java | 9 +-
.../query/runtime/operator/TransformOperator.java | 7 +-
.../runtime/operator/utils/OperatorUtils.java | 11 +-
.../runtime/operator/utils/StatsAggregator.java | 170 ------------
.../query/runtime/plan/PhysicalPlanVisitor.java | 16 +-
.../pinot/query/service/QueryDispatcher.java | 48 +---
.../pinot/query/runtime/QueryRunnerTest.java | 4 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 6 +-
.../runtime/operator/AggregateOperatorTest.java | 24 +-
.../query/runtime/operator/FilterOperatorTest.java | 33 ++-
.../runtime/operator/HashJoinOperatorTest.java | 46 ++--
.../LeafStageTransferableBlockOperatorTest.java | 48 +++-
.../runtime/operator/LiteralValueOperatorTest.java | 19 +-
.../runtime/operator/MailboxSendOperatorTest.java | 56 ++--
.../query/runtime/operator/SortOperatorTest.java | 68 +++--
.../runtime/operator/TransformOperatorTest.java | 24 +-
.../runtime/queries/ResourceBasedQueriesTest.java | 93 ++++++-
31 files changed, 666 insertions(+), 705 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 7e21bab7ab..63c1e8f9ad 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -20,7 +20,6 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -32,7 +31,6 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -43,6 +41,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
@@ -167,10 +166,10 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
}
ResultTable queryResults;
- Map<String, String> metadata = new HashMap<>();
+ ExecutionStatsAggregator executionStatsAggregator = new ExecutionStatsAggregator(false);
try {
queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs,
- sqlNodeAndOptions.getOptions(), metadata);
+ sqlNodeAndOptions.getOptions(), executionStatsAggregator);
} catch (Exception e) {
LOGGER.info("query execution failed", e);
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
@@ -185,57 +184,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
brokerResponse.setTimeUsedMs(totalTimeMs);
brokerResponse.setResultTable(queryResults);
- attachMetadataToResponse(metadata, brokerResponse);
+ executionStatsAggregator.setStats(brokerResponse);
requestContext.setQueryProcessingTime(totalTimeMs);
augmentStatistics(requestContext, brokerResponse);
return brokerResponse;
}
- //TODO: Remove this duplicate method, use the implementation from V1 engine
- private void attachMetadataToResponse(Map<String, String> stats, BrokerResponseNative brokerResponse) {
- brokerResponse.setNumDocsScanned(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName(), "0")));
- brokerResponse.setNumEntriesScannedInFilter(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), "0")));
- brokerResponse.setNumEntriesScannedPostFilter(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), "0")));
- brokerResponse.setTotalDocs(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.TOTAL_DOCS.getName(), "0")));
- brokerResponse.setNumSegmentsQueried(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName(), "0")));
- brokerResponse.setNumSegmentsProcessed(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0")));
- brokerResponse.setNumSegmentsMatched(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0")));
- brokerResponse.setNumConsumingSegmentsQueried(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), "0")));
-
- brokerResponse.setNumSegmentsPrunedByServer(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), "0")));
- brokerResponse.setNumSegmentsPrunedInvalid(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), "0")));
- brokerResponse.setNumSegmentsPrunedByLimit(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), "0")));
- brokerResponse.setNumSegmentsPrunedByValue(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), "0")));
- brokerResponse.setExplainPlanNumEmptyFilterSegments(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
- "0")));
- brokerResponse.setExplainPlanNumMatchAllFilterSegments(Long.parseLong(
- stats.getOrDefault(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(), "0")));
- brokerResponse.setNumConsumingSegmentsQueried(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), "0")));
- brokerResponse.setMinConsumingFreshnessTimeMs(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), "0")));
- brokerResponse.setNumConsumingSegmentsProcessed(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), "0")));
- brokerResponse.setNumConsumingSegmentsMatched(
- Long.parseLong(stats.getOrDefault(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), "0")));
- brokerResponse.setNumGroupsLimitReached(
- Boolean.parseBoolean(stats.getOrDefault(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "0")));
- }
-
private BrokerResponseNative constructMultistageExplainPlan(String sql, String plan) {
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
List<Object[]> rows = new ArrayList<>();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 9efac85c93..6de874dbda 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -19,28 +19,15 @@
package org.apache.pinot.core.query.reduce;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.function.LongConsumer;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTable.MetadataKey;
-import org.apache.pinot.common.metrics.BrokerMeter;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
@@ -120,240 +107,4 @@ public abstract class BaseReduceService {
protected void shutDown() {
_reduceExecutorService.shutdownNow();
}
-
- protected static class ExecutionStatsAggregator {
- private final List<QueryProcessingException> _processingExceptions = new ArrayList<>();
- private final Map<String, String> _traceInfo = new HashMap<>();
- private final boolean _enableTrace;
-
- private long _numDocsScanned = 0L;
- private long _numEntriesScannedInFilter = 0L;
- private long _numEntriesScannedPostFilter = 0L;
- private long _numSegmentsQueried = 0L;
- private long _numSegmentsProcessed = 0L;
- private long _numSegmentsMatched = 0L;
- private long _numConsumingSegmentsQueried = 0L;
- private long _numConsumingSegmentsProcessed = 0L;
- private long _numConsumingSegmentsMatched = 0L;
- private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
- private long _numTotalDocs = 0L;
- private long _offlineThreadCpuTimeNs = 0L;
- private long _realtimeThreadCpuTimeNs = 0L;
- private long _offlineSystemActivitiesCpuTimeNs = 0L;
- private long _realtimeSystemActivitiesCpuTimeNs = 0L;
- private long _offlineResponseSerializationCpuTimeNs = 0L;
- private long _realtimeResponseSerializationCpuTimeNs = 0L;
- private long _offlineTotalCpuTimeNs = 0L;
- private long _realtimeTotalCpuTimeNs = 0L;
- private long _numSegmentsPrunedByServer = 0L;
- private long _numSegmentsPrunedInvalid = 0L;
- private long _numSegmentsPrunedByLimit = 0L;
- private long _numSegmentsPrunedByValue = 0L;
- private long _explainPlanNumEmptyFilterSegments = 0L;
- private long _explainPlanNumMatchAllFilterSegments = 0L;
- private boolean _numGroupsLimitReached = false;
-
- protected ExecutionStatsAggregator(boolean enableTrace) {
- _enableTrace = enableTrace;
- }
-
- protected synchronized void aggregate(ServerRoutingInstance routingInstance, DataTable dataTable) {
- Map<String, String> metadata = dataTable.getMetadata();
- // Reduce on trace info.
- if (_enableTrace) {
- _traceInfo.put(routingInstance.getShortName(), metadata.get(MetadataKey.TRACE_INFO.getName()));
- }
-
- // Reduce on exceptions.
- Map<Integer, String> exceptions = dataTable.getExceptions();
- for (int key : exceptions.keySet()) {
- _processingExceptions.add(new QueryProcessingException(key, exceptions.get(key)));
- }
-
- // Reduce on execution statistics.
- String numDocsScannedString = metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName());
- if (numDocsScannedString != null) {
- _numDocsScanned += Long.parseLong(numDocsScannedString);
- }
- String numEntriesScannedInFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
- if (numEntriesScannedInFilterString != null) {
- _numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
- }
- String numEntriesScannedPostFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
- if (numEntriesScannedPostFilterString != null) {
- _numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
- }
- String numSegmentsQueriedString = metadata.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName());
- if (numSegmentsQueriedString != null) {
- _numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
- }
-
- String numSegmentsProcessedString = metadata.get(MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
- if (numSegmentsProcessedString != null) {
- _numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
- }
- String numSegmentsMatchedString = metadata.get(MetadataKey.NUM_SEGMENTS_MATCHED.getName());
- if (numSegmentsMatchedString != null) {
- _numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
- }
-
- String numConsumingSegmentsQueriedString = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName());
- if (numConsumingSegmentsQueriedString != null) {
- _numConsumingSegmentsQueried += Long.parseLong(numConsumingSegmentsQueriedString);
- }
-
- String numConsumingSegmentsProcessed = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
- if (numConsumingSegmentsProcessed != null) {
- _numConsumingSegmentsProcessed += Long.parseLong(numConsumingSegmentsProcessed);
- }
-
- String numConsumingSegmentsMatched = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName());
- if (numConsumingSegmentsMatched != null) {
- _numConsumingSegmentsMatched += Long.parseLong(numConsumingSegmentsMatched);
- }
-
- String minConsumingFreshnessTimeMsString = metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
- if (minConsumingFreshnessTimeMsString != null) {
- _minConsumingFreshnessTimeMs =
- Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), _minConsumingFreshnessTimeMs);
- }
-
- String threadCpuTimeNsString = metadata.get(MetadataKey.THREAD_CPU_TIME_NS.getName());
- if (threadCpuTimeNsString != null) {
- if (routingInstance.getTableType() == TableType.OFFLINE) {
- _offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
- } else {
- _realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
- }
- }
-
- String systemActivitiesCpuTimeNsString = metadata.get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
- if (systemActivitiesCpuTimeNsString != null) {
- if (routingInstance.getTableType() == TableType.OFFLINE) {
- _offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
- } else {
- _realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
- }
- }
-
- String responseSerializationCpuTimeNsString = metadata.get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
- if (responseSerializationCpuTimeNsString != null) {
- if (routingInstance.getTableType() == TableType.OFFLINE) {
- _offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
- } else {
- _realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
- }
- }
- _offlineTotalCpuTimeNs =
- _offlineThreadCpuTimeNs + _offlineSystemActivitiesCpuTimeNs + _offlineResponseSerializationCpuTimeNs;
- _realtimeTotalCpuTimeNs =
- _realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs;
-
- withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
- l -> _numSegmentsPrunedByServer += l);
- withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_INVALID, l -> _numSegmentsPrunedInvalid += l);
- withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT, l -> _numSegmentsPrunedByLimit += l);
- withNotNullLongMetadata(metadata, MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE, l -> _numSegmentsPrunedByValue += l);
-
- String explainPlanNumEmptyFilterSegments =
- metadata.get(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName());
- if (explainPlanNumEmptyFilterSegments != null) {
- _explainPlanNumEmptyFilterSegments += Long.parseLong(explainPlanNumEmptyFilterSegments);
- }
-
- String explainPlanNumMatchAllFilterSegments =
- metadata.get(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName());
- if (explainPlanNumMatchAllFilterSegments != null) {
- _explainPlanNumMatchAllFilterSegments += Long.parseLong(explainPlanNumMatchAllFilterSegments);
- }
-
- String numTotalDocsString = metadata.get(MetadataKey.TOTAL_DOCS.getName());
- if (numTotalDocsString != null) {
- _numTotalDocs += Long.parseLong(numTotalDocsString);
- }
- _numGroupsLimitReached |= Boolean.parseBoolean(metadata.get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
- }
-
- protected void setStats(String rawTableName, BrokerResponseNative brokerResponseNative,
- BrokerMetrics brokerMetrics) {
- // set exception
- List<QueryProcessingException> processingExceptions = brokerResponseNative.getProcessingExceptions();
- processingExceptions.addAll(_processingExceptions);
-
- // add all trace.
- if (_enableTrace) {
- brokerResponseNative.getTraceInfo().putAll(_traceInfo);
- }
-
- // Set execution statistics.
- brokerResponseNative.setNumDocsScanned(_numDocsScanned);
- brokerResponseNative.setNumEntriesScannedInFilter(_numEntriesScannedInFilter);
- brokerResponseNative.setNumEntriesScannedPostFilter(_numEntriesScannedPostFilter);
- brokerResponseNative.setNumSegmentsQueried(_numSegmentsQueried);
- brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
- brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
- brokerResponseNative.setTotalDocs(_numTotalDocs);
- brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
- brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
- brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
- brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
- brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(_realtimeSystemActivitiesCpuTimeNs);
- brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(_offlineResponseSerializationCpuTimeNs);
- brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(_realtimeResponseSerializationCpuTimeNs);
- brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
- brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
- brokerResponseNative.setNumSegmentsPrunedByServer(_numSegmentsPrunedByServer);
- brokerResponseNative.setNumSegmentsPrunedInvalid(_numSegmentsPrunedInvalid);
- brokerResponseNative.setNumSegmentsPrunedByLimit(_numSegmentsPrunedByLimit);
- brokerResponseNative.setNumSegmentsPrunedByValue(_numSegmentsPrunedByValue);
- brokerResponseNative.setExplainPlanNumEmptyFilterSegments(_explainPlanNumEmptyFilterSegments);
- brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
- if (_numConsumingSegmentsQueried > 0) {
- brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsQueried);
- }
- if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
- brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
- }
- brokerResponseNative.setNumConsumingSegmentsProcessed(_numConsumingSegmentsProcessed);
- brokerResponseNative.setNumConsumingSegmentsMatched(_numConsumingSegmentsMatched);
-
- // Update broker metrics.
- if (brokerMetrics != null) {
- brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, _numDocsScanned);
- brokerMetrics
- .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, _numEntriesScannedInFilter);
- brokerMetrics
- .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, _numEntriesScannedPostFilter);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, _offlineThreadCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS,
- _realtimeThreadCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
- _offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
- _realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
- _offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
- _realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, _offlineTotalCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
- TimeUnit.NANOSECONDS);
-
- if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
- System.currentTimeMillis() - _minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
- }
- }
- }
-
- private void withNotNullLongMetadata(Map<String, String> metadata, MetadataKey key, LongConsumer consumer) {
- String strValue = metadata.get(key.getName());
- if (strValue != null) {
- consumer.accept(Long.parseLong(strValue));
- }
- }
- }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
new file mode 100644
index 0000000000..0c93252c51
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+
+
+public class ExecutionStatsAggregator {
+ private final List<QueryProcessingException> _processingExceptions = new ArrayList<>();
+ private final Map<String, String> _traceInfo = new HashMap<>();
+ private final boolean _enableTrace;
+
+ private long _numDocsScanned = 0L;
+ private long _numEntriesScannedInFilter = 0L;
+ private long _numEntriesScannedPostFilter = 0L;
+ private long _numSegmentsQueried = 0L;
+ private long _numSegmentsProcessed = 0L;
+ private long _numSegmentsMatched = 0L;
+ private long _numConsumingSegmentsQueried = 0L;
+ private long _numConsumingSegmentsProcessed = 0L;
+ private long _numConsumingSegmentsMatched = 0L;
+ private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+ private long _numTotalDocs = 0L;
+ private long _offlineThreadCpuTimeNs = 0L;
+ private long _realtimeThreadCpuTimeNs = 0L;
+ private long _offlineSystemActivitiesCpuTimeNs = 0L;
+ private long _realtimeSystemActivitiesCpuTimeNs = 0L;
+ private long _offlineResponseSerializationCpuTimeNs = 0L;
+ private long _realtimeResponseSerializationCpuTimeNs = 0L;
+ private long _offlineTotalCpuTimeNs = 0L;
+ private long _realtimeTotalCpuTimeNs = 0L;
+ private long _numSegmentsPrunedByServer = 0L;
+ private long _numSegmentsPrunedInvalid = 0L;
+ private long _numSegmentsPrunedByLimit = 0L;
+ private long _numSegmentsPrunedByValue = 0L;
+ private long _explainPlanNumEmptyFilterSegments = 0L;
+ private long _explainPlanNumMatchAllFilterSegments = 0L;
+ private boolean _numGroupsLimitReached = false;
+
+ public ExecutionStatsAggregator(boolean enableTrace) {
+ _enableTrace = enableTrace;
+ }
+
+ public void aggregate(ServerRoutingInstance routingInstance, DataTable dataTable) {
+ aggregate(routingInstance, dataTable.getMetadata(), dataTable.getExceptions());
+ }
+
+ public synchronized void aggregate(@Nullable ServerRoutingInstance routingInstance, Map<String, String> metadata,
+ Map<Integer, String> exceptions) {
+ // Reduce on trace info.
+ if (_enableTrace) {
+ _traceInfo.put(routingInstance.getShortName(), metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
+ }
+
+ // Reduce on exceptions.
+ for (int key : exceptions.keySet()) {
+ _processingExceptions.add(new QueryProcessingException(key, exceptions.get(key)));
+ }
+
+ // Reduce on execution statistics.
+ String numDocsScannedString = metadata.get(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName());
+ if (numDocsScannedString != null) {
+ _numDocsScanned += Long.parseLong(numDocsScannedString);
+ }
+ String numEntriesScannedInFilterString =
+ metadata.get(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
+ if (numEntriesScannedInFilterString != null) {
+ _numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
+ }
+ String numEntriesScannedPostFilterString =
+ metadata.get(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
+ if (numEntriesScannedPostFilterString != null) {
+ _numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
+ }
+ String numSegmentsQueriedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName());
+ if (numSegmentsQueriedString != null) {
+ _numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
+ }
+
+ String numSegmentsProcessedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
+ if (numSegmentsProcessedString != null) {
+ _numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
+ }
+ String numSegmentsMatchedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName());
+ if (numSegmentsMatchedString != null) {
+ _numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
+ }
+
+ String numConsumingSegmentsQueriedString =
+ metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName());
+ if (numConsumingSegmentsQueriedString != null) {
+ _numConsumingSegmentsQueried += Long.parseLong(numConsumingSegmentsQueriedString);
+ }
+
+ String numConsumingSegmentsProcessed =
+ metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
+ if (numConsumingSegmentsProcessed != null) {
+ _numConsumingSegmentsProcessed += Long.parseLong(numConsumingSegmentsProcessed);
+ }
+
+ String numConsumingSegmentsMatched = metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName());
+ if (numConsumingSegmentsMatched != null) {
+ _numConsumingSegmentsMatched += Long.parseLong(numConsumingSegmentsMatched);
+ }
+
+ String minConsumingFreshnessTimeMsString =
+ metadata.get(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
+ if (minConsumingFreshnessTimeMsString != null) {
+ _minConsumingFreshnessTimeMs =
+ Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), _minConsumingFreshnessTimeMs);
+ }
+
+ String threadCpuTimeNsString = metadata.get(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName());
+ if (routingInstance != null && threadCpuTimeNsString != null) {
+ if (routingInstance.getTableType() == TableType.OFFLINE) {
+ _offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
+ } else {
+ _realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
+ }
+ }
+
+ String systemActivitiesCpuTimeNsString =
+ metadata.get(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
+ if (routingInstance != null && systemActivitiesCpuTimeNsString != null) {
+ if (routingInstance.getTableType() == TableType.OFFLINE) {
+ _offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+ } else {
+ _realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+ }
+ }
+
+ String responseSerializationCpuTimeNsString =
+ metadata.get(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+ if (routingInstance != null && responseSerializationCpuTimeNsString != null) {
+ if (routingInstance.getTableType() == TableType.OFFLINE) {
+ _offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+ } else {
+ _realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+ }
+ }
+ _offlineTotalCpuTimeNs =
+ _offlineThreadCpuTimeNs + _offlineSystemActivitiesCpuTimeNs + _offlineResponseSerializationCpuTimeNs;
+ _realtimeTotalCpuTimeNs =
+ _realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs;
+
+ withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
+ l -> _numSegmentsPrunedByServer += l);
+ withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID,
+ l -> _numSegmentsPrunedInvalid += l);
+ withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT,
+ l -> _numSegmentsPrunedByLimit += l);
+ withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE,
+ l -> _numSegmentsPrunedByValue += l);
+
+ String explainPlanNumEmptyFilterSegments =
+ metadata.get(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName());
+ if (explainPlanNumEmptyFilterSegments != null) {
+ _explainPlanNumEmptyFilterSegments += Long.parseLong(explainPlanNumEmptyFilterSegments);
+ }
+
+ String explainPlanNumMatchAllFilterSegments =
+ metadata.get(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName());
+ if (explainPlanNumMatchAllFilterSegments != null) {
+ _explainPlanNumMatchAllFilterSegments += Long.parseLong(explainPlanNumMatchAllFilterSegments);
+ }
+
+ String numTotalDocsString = metadata.get(DataTable.MetadataKey.TOTAL_DOCS.getName());
+ if (numTotalDocsString != null) {
+ _numTotalDocs += Long.parseLong(numTotalDocsString);
+ }
+ _numGroupsLimitReached |=
+ Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+ }
+
+ public void setStats(BrokerResponseNative brokerResponseNative) {
+ setStats(null, brokerResponseNative, null);
+ }
+
+ public void setStats(@Nullable String rawTableName, BrokerResponseNative brokerResponseNative,
+ @Nullable BrokerMetrics brokerMetrics) {
+ // set exception
+ List<QueryProcessingException> processingExceptions = brokerResponseNative.getProcessingExceptions();
+ processingExceptions.addAll(_processingExceptions);
+
+ // add all trace.
+ if (_enableTrace) {
+ brokerResponseNative.getTraceInfo().putAll(_traceInfo);
+ }
+
+ // Set execution statistics.
+ brokerResponseNative.setNumDocsScanned(_numDocsScanned);
+ brokerResponseNative.setNumEntriesScannedInFilter(_numEntriesScannedInFilter);
+ brokerResponseNative.setNumEntriesScannedPostFilter(_numEntriesScannedPostFilter);
+ brokerResponseNative.setNumSegmentsQueried(_numSegmentsQueried);
+ brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
+ brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
+ brokerResponseNative.setTotalDocs(_numTotalDocs);
+ brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
+ brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
+ brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
+ brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
+ brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(_realtimeSystemActivitiesCpuTimeNs);
+ brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(_offlineResponseSerializationCpuTimeNs);
+ brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(_realtimeResponseSerializationCpuTimeNs);
+ brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
+ brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
+ brokerResponseNative.setNumSegmentsPrunedByServer(_numSegmentsPrunedByServer);
+ brokerResponseNative.setNumSegmentsPrunedInvalid(_numSegmentsPrunedInvalid);
+ brokerResponseNative.setNumSegmentsPrunedByLimit(_numSegmentsPrunedByLimit);
+ brokerResponseNative.setNumSegmentsPrunedByValue(_numSegmentsPrunedByValue);
+ brokerResponseNative.setExplainPlanNumEmptyFilterSegments(_explainPlanNumEmptyFilterSegments);
+ brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
+ if (_numConsumingSegmentsQueried > 0) {
+ brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsQueried);
+ }
+ if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
+ brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
+ }
+ brokerResponseNative.setNumConsumingSegmentsProcessed(_numConsumingSegmentsProcessed);
+ brokerResponseNative.setNumConsumingSegmentsMatched(_numConsumingSegmentsMatched);
+
+ // Update broker metrics.
+ if (brokerMetrics != null && rawTableName != null) {
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, _numDocsScanned);
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER,
+ _numEntriesScannedInFilter);
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER,
+ _numEntriesScannedPostFilter);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, _offlineThreadCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS, _realtimeThreadCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ _offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ _realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
+ _offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
+ _realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, _offlineTotalCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+
+ if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
+ System.currentTimeMillis() - _minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private void withNotNullLongMetadata(Map<String, String> metadata, DataTable.MetadataKey key, LongConsumer consumer) {
+ String strValue = metadata.get(key.getName());
+ if (strValue != null) {
+ consumer.accept(Long.parseLong(strValue));
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
index 9946ddfa15..a48b12563d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
@@ -56,7 +56,7 @@ public class StreamingReduceServiceTest {
threadPoolService,
ImmutableMap.of(routingInstance, mockedResponse),
1000,
- mock(BaseReduceService.ExecutionStatsAggregator.class));
+ mock(ExecutionStatsAggregator.class));
return null;
}, cause -> cause.getMessage().contains(exceptionMessage))
);
@@ -85,7 +85,7 @@ public class StreamingReduceServiceTest {
threadPoolService,
ImmutableMap.of(routingInstance, mockedResponse),
10,
- mock(BaseReduceService.ExecutionStatsAggregator.class));
+ mock(ExecutionStatsAggregator.class));
return null;
},
(cause) -> cause instanceof TimeoutException));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d171336a52..3846e4d424 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -154,8 +154,9 @@ public class QueryRunner {
StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(_mailboxService,
new LeafStageTransferableBlockOperator(serverQueryResults, sendNode.getDataSchema(), requestId,
- sendNode.getStageId()), receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(),
- sendNode.getPartitionKeySelector(), _rootServer, serverQueryRequests.get(0).getRequestId(),
+ sendNode.getStageId(), _rootServer), receivingStageMetadata.getServerInstances(),
+ sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _rootServer,
+ serverQueryRequests.get(0).getRequestId(),
sendNode.getStageId(), sendNode.getReceiverStageId());
int blockCounter = 0;
while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 5b1e1e1de9..6b3babca82 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
@@ -81,16 +82,18 @@ public class AggregateOperator extends MultiStageOperator {
// groupSet has to be a list of InputRef and cannot be null
// TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
public AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls,
- List<RexExpression> groupSet, DataSchema inputSchema, long requestId, int stageId) {
+ List<RexExpression> groupSet, DataSchema inputSchema, long requestId, int stageId,
+ VirtualServerAddress virtualServerAddress) {
this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, AggregateOperator.Accumulator.MERGERS, requestId,
- stageId);
+ stageId, virtualServerAddress);
}
@VisibleForTesting
AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls,
List<RexExpression> groupSet, DataSchema inputSchema,
- Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long requestId, int stageId) {
- super(requestId, stageId);
+ Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long requestId, int stageId,
+ VirtualServerAddress serverAddress) {
+ super(requestId, stageId, serverAddress);
_inputOperator = inputOperator;
_groupSet = groupSet;
_upstreamErrorBlock = null;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index ba6f64f19e..0b819fdd46 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
@@ -55,8 +56,8 @@ public class FilterOperator extends MultiStageOperator {
private TransferableBlock _upstreamErrorBlock;
public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter,
- long requestId, int stageId) {
- super(requestId, stageId);
+ long requestId, int stageId, VirtualServerAddress serverAddress) {
+ super(requestId, stageId, serverAddress);
_upstreamOperator = upstreamOperator;
_dataSchema = dataSchema;
_filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 1c27281a96..a813f35db3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
@@ -88,8 +89,8 @@ public class HashJoinOperator extends MultiStageOperator {
private KeySelector<Object[], Object[]> _rightKeySelector;
public HashJoinOperator(MultiStageOperator leftTableOperator, MultiStageOperator rightTableOperator,
- DataSchema leftSchema, JoinNode node, long requestId, int stageId) {
- super(requestId, stageId);
+ DataSchema leftSchema, JoinNode node, long requestId, int stageId, VirtualServerAddress serverAddress) {
+ super(requestId, stageId, serverAddress);
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
"Join type: " + node.getJoinRelType() + " is not supported!");
_joinType = node.getJoinRelType();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 1e11b51953..d2b9c4db16 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,8 +68,8 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
private int _currentIndex;
public LeafStageTransferableBlockOperator(List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema,
- long requestId, int stageId) {
- super(requestId, stageId);
+ long requestId, int stageId, VirtualServerAddress serverAddress) {
+ super(requestId, stageId, serverAddress);
_baseResultBlock = baseResultBlock;
_desiredDataSchema = dataSchema;
_errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
@@ -86,7 +87,7 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
@Nullable
@Override
public String toExplainString() {
- return EXPLAIN_NAME;
+ return EXPLAIN_NAME;
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 6e123dd6b1..36d3fe511f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
@@ -40,8 +41,8 @@ public class LiteralValueOperator extends MultiStageOperator {
private boolean _isLiteralBlockReturned;
public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows,
- long requestId, int stageId) {
- super(requestId, stageId);
+ long requestId, int stageId, VirtualServerAddress serverAddress) {
+ super(requestId, stageId, serverAddress);
_dataSchema = dataSchema;
_rexLiteralBlock = constructBlock(rexLiteralRows);
_isLiteralBlockReturned = false;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 3062b8cd38..0b16a47229 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -80,7 +80,7 @@ public class MailboxReceiveOperator extends MultiStageOperator {
public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver,
long jobId, int senderStageId, int receiverStageId, Long timeoutMs) {
- super(jobId, senderStageId);
+ super(jobId, senderStageId, receiver);
_mailboxService = mailboxService;
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
"Exchange/Distribution type: " + exchangeType + " is not supported!");
@@ -164,7 +164,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
return block;
} else {
if (!block.getResultMetadata().isEmpty()) {
- _operatorStats.clearExecutionStats();
_operatorStatsMap.putAll(block.getResultMetadata());
}
eosMailboxCount++;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index e26c46f850..504a0d997b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -73,7 +73,7 @@ public class MailboxSendOperator extends MultiStageOperator {
VirtualServerAddress sendingServer, long jobId, int senderStageId, int receiverStageId) {
this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector,
server -> toMailboxId(server, jobId, senderStageId, receiverStageId, sendingServer), BlockExchange::getExchange,
- jobId, senderStageId, receiverStageId);
+ jobId, senderStageId, receiverStageId, sendingServer);
}
@VisibleForTesting
@@ -81,8 +81,8 @@ public class MailboxSendOperator extends MultiStageOperator {
MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, long jobId, int senderStageId,
- int receiverStageId) {
- super(jobId, senderStageId);
+ int receiverStageId, VirtualServerAddress serverAddress) {
+ super(jobId, senderStageId, serverAddress);
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
List<MailboxIdentifier> receivingMailboxes;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index b55b42145a..d9c0104d91 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
@@ -38,14 +39,18 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
// TODO: Move to OperatorContext class.
protected final long _requestId;
protected final int _stageId;
+ protected final VirtualServerAddress _serverAddress;
protected final OperatorStats _operatorStats;
protected final Map<String, OperatorStats> _operatorStatsMap;
+ private final String _operatorId;
- public MultiStageOperator(long requestId, int stageId) {
+ public MultiStageOperator(long requestId, int stageId, VirtualServerAddress serverAddress) {
_requestId = requestId;
_stageId = stageId;
- _operatorStats = new OperatorStats(requestId, stageId, toExplainString());
+ _operatorStats = new OperatorStats(requestId, stageId, serverAddress, toExplainString());
+ _serverAddress = serverAddress;
_operatorStatsMap = new HashMap<>();
+ _operatorId = Joiner.on("_").join(toExplainString(), _requestId, _stageId, _serverAddress);
}
public Map<String, OperatorStats> getOperatorStatsMap() {
@@ -62,7 +67,6 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
TransferableBlock nextBlock = getNextBlock();
_operatorStats.recordRow(1, nextBlock.getNumRows());
_operatorStats.endTimer();
- // TODO: move this to centralized reporting in broker
if (nextBlock.isEndOfStreamBlock()) {
if (nextBlock.isSuccessfulEndOfStreamBlock()) {
for (MultiStageOperator op : getChildOperators()) {
@@ -70,8 +74,7 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
}
if (!_operatorStats.getExecutionStats().isEmpty()) {
- String operatorId = Joiner.on("_").join(toExplainString(), _requestId, _stageId);
- _operatorStatsMap.put(operatorId, _operatorStats);
+ _operatorStatsMap.put(_operatorId, _operatorStats);
}
return TransferableBlockUtils.getEndOfStreamTransferableBlock(
OperatorUtils.getMetadataFromOperatorStats(_operatorStatsMap));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 89456ecf9c..09a8d7ac60 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -19,9 +19,12 @@
package org.apache.pinot.query.runtime.operator;
import com.google.common.base.Stopwatch;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.pinot.query.runtime.operator.utils.StatsAggregator;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
+
public class OperatorStats {
private final Stopwatch _executeStopwatch = Stopwatch.createUnstarted();
@@ -29,18 +32,20 @@ public class OperatorStats {
// TODO: add a operatorId for better tracking purpose.
private final int _stageId;
private final long _requestId;
+ private final VirtualServerAddress _serverAddress;
private final String _operatorType;
private int _numBlock = 0;
private int _numRows = 0;
- private StatsAggregator _statsAggregator;
+ private Map<String, String> _executionStats;
- public OperatorStats(long requestId, int stageId, String operatorType) {
+ public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress, String operatorType) {
_stageId = stageId;
_requestId = requestId;
+ _serverAddress = serverAddress;
_operatorType = operatorType;
- _statsAggregator = new StatsAggregator();
+ _executionStats = new HashMap<>();
}
public void startTimer() {
@@ -61,11 +66,15 @@ public class OperatorStats {
}
public void recordExecutionStats(Map<String, String> executionStats) {
- _statsAggregator.aggregate(executionStats);
+ _executionStats = executionStats;
}
public Map<String, String> getExecutionStats() {
- return _statsAggregator.getStats();
+ _executionStats.put(OperatorUtils.NUM_BLOCKS, String.valueOf(_numBlock));
+ _executionStats.put(OperatorUtils.NUM_ROWS, String.valueOf(_numRows));
+ _executionStats.put(OperatorUtils.THREAD_EXECUTION_TIME,
+ String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS)));
+ return _executionStats;
}
public int getStageId() {
@@ -76,19 +85,16 @@ public class OperatorStats {
return _requestId;
}
- public String getOperatorType() {
- return _operatorType;
+ public VirtualServerAddress getServerAddress() {
+ return _serverAddress;
}
- public void clearExecutionStats() {
- _statsAggregator = new StatsAggregator();
+ public String getOperatorType() {
+ return _operatorType;
}
- // TODO: Return the string as a JSON string.
@Override
public String toString() {
- return String.format(
- "OperatorStats[requestId: %s, stageId %s, type: %s] ExecutionWallTime: %sms, No. Rows: %s, No. Block: %s",
- _requestId, _stageId, _operatorType, _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numRows, _numBlock);
+ return OperatorUtils.operatorStatsToJson(this);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index e2ce8f6499..2298dbb4c4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
@@ -55,16 +56,16 @@ public class SortOperator extends MultiStageOperator {
public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
- long requestId, int stageId) {
+ long requestId, int stageId, VirtualServerAddress serverAddress) {
this(upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema,
- SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, requestId, stageId);
+ SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, requestId, stageId, serverAddress);
}
@VisibleForTesting
SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
- int defaultHolderCapacity, long requestId, int stageId) {
- super(requestId, stageId);
+ int defaultHolderCapacity, long requestId, int stageId, VirtualServerAddress serverAddress) {
+ super(requestId, stageId, serverAddress);
_upstreamOperator = upstreamOperator;
_fetch = fetch;
_offset = Math.max(offset, 0);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 691aed44fe..03f54ee6f4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
@@ -54,9 +55,9 @@ public class TransformOperator extends MultiStageOperator {
private final DataSchema _resultSchema;
private TransferableBlock _upstreamErrorBlock;
- public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema,
- List<RexExpression> transforms, DataSchema upstreamDataSchema, long requestId, int stageId) {
- super(requestId, stageId);
+ public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema, List<RexExpression> transforms,
+ DataSchema upstreamDataSchema, long requestId, int stageId, VirtualServerAddress serverAddress) {
+ super(requestId, stageId, serverAddress);
Preconditions.checkState(!transforms.isEmpty(), "transform operand should not be empty.");
Preconditions.checkState(resultSchema.size() == transforms.size(),
"result schema size:" + resultSchema.size() + " doesn't match transform operand size:" + transforms.size());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index a20fd658d1..532befd702 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
@@ -31,6 +32,10 @@ import org.slf4j.LoggerFactory;
public class OperatorUtils {
+ public static final String NUM_BLOCKS = "numBlocks";
+ public static final String NUM_ROWS = "numRows";
+ public static final String THREAD_EXECUTION_TIME = "threadExecutionTime";
+
private static final Logger LOGGER = LoggerFactory.getLogger(OperatorUtils.class);
private static final Map<String, String> OPERATOR_TOKEN_MAPPING = new HashMap<>();
@@ -69,6 +74,7 @@ public class OperatorUtils {
Map<String, Object> jsonOut = new HashMap<>();
jsonOut.put("requestId", operatorStats.getRequestId());
jsonOut.put("stageId", operatorStats.getStageId());
+ jsonOut.put("serverAddress", operatorStats.getServerAddress().toString());
jsonOut.put("operatorType", operatorStats.getOperatorType());
jsonOut.put("executionStats", operatorStats.getExecutionStats());
return JsonUtils.objectToString(jsonOut);
@@ -83,9 +89,12 @@ public class OperatorUtils {
JsonNode operatorStatsNode = JsonUtils.stringToJsonNode(json);
long requestId = operatorStatsNode.get("requestId").asLong();
int stageId = operatorStatsNode.get("stageId").asInt();
+ String serverAddressStr = operatorStatsNode.get("serverAddress").asText();
+ VirtualServerAddress serverAddress = VirtualServerAddress.parse(serverAddressStr);
String operatorType = operatorStatsNode.get("operatorType").asText();
- OperatorStats operatorStats = new OperatorStats(requestId, stageId, operatorType);
+ OperatorStats operatorStats =
+ new OperatorStats(requestId, stageId, serverAddress, operatorType);
operatorStats.recordExecutionStats(
JsonUtils.jsonNodeToObject(operatorStatsNode.get("executionStats"), new TypeReference<Map<String, String>>() {
}));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/StatsAggregator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/StatsAggregator.java
deleted file mode 100644
index 79b4c87ed0..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/StatsAggregator.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.operator.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.LongConsumer;
-import org.apache.pinot.common.datatable.DataTable;
-
-//TODO: Remove this and use BaseReduceService.ExecutionStatsAggregator
-public class StatsAggregator {
- private long _numDocsScanned = 0L;
- private long _numEntriesScannedInFilter = 0L;
- private long _numEntriesScannedPostFilter = 0L;
- private long _numSegmentsQueried = 0L;
- private long _numSegmentsProcessed = 0L;
- private long _numSegmentsMatched = 0L;
- private long _numConsumingSegmentsQueried = 0L;
- private long _numConsumingSegmentsProcessed = 0L;
- private long _numConsumingSegmentsMatched = 0L;
- private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
- private long _numTotalDocs = 0L;
- private long _numSegmentsPrunedByServer = 0L;
- private long _numSegmentsPrunedInvalid = 0L;
- private long _numSegmentsPrunedByLimit = 0L;
- private long _numSegmentsPrunedByValue = 0L;
- private long _explainPlanNumEmptyFilterSegments = 0L;
- private long _explainPlanNumMatchAllFilterSegments = 0L;
- private boolean _numGroupsLimitReached = false;
-
- public synchronized void aggregate(Map<String, String> metadata) {
- // Reduce on execution statistics.
- String numDocsScannedString = metadata.get(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName());
- if (numDocsScannedString != null) {
- _numDocsScanned += Long.parseLong(numDocsScannedString);
- }
- String numEntriesScannedInFilterString =
- metadata.get(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
- if (numEntriesScannedInFilterString != null) {
- _numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
- }
- String numEntriesScannedPostFilterString =
- metadata.get(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
- if (numEntriesScannedPostFilterString != null) {
- _numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
- }
- String numSegmentsQueriedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName());
- if (numSegmentsQueriedString != null) {
- _numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
- }
-
- String numSegmentsProcessedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
- if (numSegmentsProcessedString != null) {
- _numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
- }
- String numSegmentsMatchedString = metadata.get(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName());
- if (numSegmentsMatchedString != null) {
- _numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
- }
-
- String numConsumingSegmentsQueriedString =
- metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName());
- if (numConsumingSegmentsQueriedString != null) {
- _numConsumingSegmentsQueried += Long.parseLong(numConsumingSegmentsQueriedString);
- }
-
- String numConsumingSegmentsProcessed =
- metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
- if (numConsumingSegmentsProcessed != null) {
- _numConsumingSegmentsProcessed += Long.parseLong(numConsumingSegmentsProcessed);
- }
-
- String numConsumingSegmentsMatched = metadata.get(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName());
- if (numConsumingSegmentsMatched != null) {
- _numConsumingSegmentsMatched += Long.parseLong(numConsumingSegmentsMatched);
- }
-
- String minConsumingFreshnessTimeMsString =
- metadata.get(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
- if (minConsumingFreshnessTimeMsString != null) {
- _minConsumingFreshnessTimeMs =
- Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), _minConsumingFreshnessTimeMs);
- }
-
- withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
- l -> _numSegmentsPrunedByServer += l);
- withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID,
- l -> _numSegmentsPrunedInvalid += l);
- withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT,
- l -> _numSegmentsPrunedByLimit += l);
- withNotNullLongMetadata(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE,
- l -> _numSegmentsPrunedByValue += l);
-
- String explainPlanNumEmptyFilterSegments =
- metadata.get(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName());
- if (explainPlanNumEmptyFilterSegments != null) {
- _explainPlanNumEmptyFilterSegments += Long.parseLong(explainPlanNumEmptyFilterSegments);
- }
-
- String explainPlanNumMatchAllFilterSegments =
- metadata.get(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName());
- if (explainPlanNumMatchAllFilterSegments != null) {
- _explainPlanNumMatchAllFilterSegments += Long.parseLong(explainPlanNumMatchAllFilterSegments);
- }
-
- String numTotalDocsString = metadata.get(DataTable.MetadataKey.TOTAL_DOCS.getName());
- if (numTotalDocsString != null) {
- _numTotalDocs += Long.parseLong(numTotalDocsString);
- }
- _numGroupsLimitReached |=
- Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
- }
-
- public Map<String, String> getStats() {
- Map<String, String> metadata = new HashMap<>();
- metadata.put(DataTable.MetadataKey.NUM_DOCS_SCANNED.getName(), String.valueOf(_numDocsScanned));
- metadata.put(DataTable.MetadataKey.TOTAL_DOCS.getName(), String.valueOf(_numTotalDocs));
- metadata.put(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
- String.valueOf(_numEntriesScannedInFilter));
- metadata.put(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
- String.valueOf(_numEntriesScannedPostFilter));
- metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED.getName(), String.valueOf(_numSegmentsQueried));
- metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), String.valueOf(_numSegmentsProcessed));
- metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED.getName(), String.valueOf(_numSegmentsMatched));
- metadata.put(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
- String.valueOf(_numConsumingSegmentsQueried));
- metadata.put(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
- String.valueOf(_numConsumingSegmentsProcessed));
- metadata.put(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
- String.valueOf(_numConsumingSegmentsMatched));
- metadata.put(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
- String.valueOf(_minConsumingFreshnessTimeMs));
- metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(),
- String.valueOf(_numSegmentsPrunedByServer));
- metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
- String.valueOf(_numSegmentsPrunedByLimit));
- metadata.put(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
- String.valueOf(_numSegmentsPrunedInvalid));
- metadata.put(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
- String.valueOf(_explainPlanNumEmptyFilterSegments));
- metadata.put(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(),
- String.valueOf(_explainPlanNumMatchAllFilterSegments));
- metadata.put(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), String.valueOf(_numGroupsLimitReached));
-
- return metadata;
- }
-
- private void withNotNullLongMetadata(Map<String, String> metadata, DataTable.MetadataKey key, LongConsumer consumer) {
- String strValue = metadata.get(key.getName());
- if (strValue != null) {
- consumer.accept(Long.parseLong(strValue));
- }
- }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index fb2de69d89..ea9c2c38e0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -85,8 +85,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
- return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(),
- node.getGroupSet(), node.getInputs().get(0).getDataSchema(), context._requestId, context._stageId);
+ return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(), node.getGroupSet(),
+ node.getInputs().get(0).getDataSchema(), context._requestId, context._stageId, context.getServer());
}
@Override
@@ -99,7 +99,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
return new FilterOperator(nextOperator, node.getDataSchema(), node.getCondition(), context.getRequestId(),
- context.getStageId());
+ context.getStageId(), context.getServer());
}
@Override
@@ -111,21 +111,21 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
MultiStageOperator rightOperator = right.visit(this, context);
return new HashJoinOperator(leftOperator, rightOperator, left.getDataSchema(), node, context.getRequestId(),
- context.getStageId());
+ context.getStageId(), context.getServer());
}
@Override
public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
return new TransformOperator(nextOperator, node.getDataSchema(), node.getProjects(),
- node.getInputs().get(0).getDataSchema(), context.getRequestId(), context.getStageId());
+ node.getInputs().get(0).getDataSchema(), context.getRequestId(), context.getStageId(), context.getServer());
}
@Override
public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
- return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(),
- node.getFetch(), node.getOffset(), node.getDataSchema(), context.getRequestId(), context.getStageId());
+ return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(), node.getFetch(),
+ node.getOffset(), node.getDataSchema(), context.getRequestId(), context.getStageId(), context.getServer());
}
@Override
@@ -136,6 +136,6 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitValue(ValueNode node, PlanRequestContext context) {
return new LiteralValueOperator(node.getDataSchema(), node.getLiteralRows(), context.getRequestId(),
- context.getStageId());
+ context.getStageId(), context.getServer());
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index b0f959fcd7..9c7b73a9e0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.util.Pair;
import org.apache.pinot.common.datablock.DataBlock;
@@ -36,6 +37,7 @@ import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
@@ -47,7 +49,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
-import org.apache.pinot.query.runtime.operator.utils.StatsAggregator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.roaringbitmap.RoaringBitmap;
@@ -66,15 +67,9 @@ public class QueryDispatcher {
public QueryDispatcher() {
}
- public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
- MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions)
- throws Exception {
- return submitAndReduce(requestId, queryPlan, mailboxService, timeoutMs, queryOptions, new HashMap<>());
- }
-
public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions,
- Map<String, String> metadata)
+ ExecutionStatsAggregator executionStatsAggregator)
throws Exception {
// submit all the distributed stages.
int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
@@ -84,7 +79,8 @@ public class QueryDispatcher {
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), requestId,
reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(),
new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs);
- List<DataBlock> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, metadata);
+ List<DataBlock> resultDataBlocks =
+ reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, executionStatsAggregator);
return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
queryPlan.getQueryStageMap().get(0).getDataSchema());
}
@@ -132,29 +128,11 @@ public class QueryDispatcher {
}
public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs) {
- List<DataBlock> resultDataBlocks = new ArrayList<>();
- TransferableBlock transferableBlock;
- long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
- while (System.nanoTime() < timeoutWatermark) {
- transferableBlock = mailboxReceiveOperator.nextBlock();
- if (TransferableBlockUtils.isEndOfStream(transferableBlock) && transferableBlock.isErrorBlock()) {
- // TODO: we only received bubble up error from the execution stage tree.
- // TODO: query dispatch should also send cancel signal to the rest of the execution stage tree.
- throw new RuntimeException(
- "Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions());
- }
- if (transferableBlock.isNoOpBlock()) {
- continue;
- } else if (transferableBlock.isEndOfStreamBlock()) {
- return resultDataBlocks;
- }
- resultDataBlocks.add(transferableBlock.getDataBlock());
- }
- throw new RuntimeException("Timed out while receiving from mailbox: " + QueryException.EXECUTION_TIMEOUT_ERROR);
+ return reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, null);
}
public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs,
- Map<String, String> metadata) {
+ @Nullable ExecutionStatsAggregator executionStatsAggregator) {
List<DataBlock> resultDataBlocks = new ArrayList<>();
TransferableBlock transferableBlock;
long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
@@ -169,13 +147,13 @@ public class QueryDispatcher {
if (transferableBlock.isNoOpBlock()) {
continue;
} else if (transferableBlock.isEndOfStreamBlock()) {
- StatsAggregator statsAggregator = new StatsAggregator();
- for (Map.Entry<String, OperatorStats> entry : transferableBlock.getResultMetadata().entrySet()) {
- LOGGER.info("Broker Query Execution Stats, OperatorId: {}, OperatorStats: {}", entry.getKey(),
- OperatorUtils.operatorStatsToJson(entry.getValue()));
- statsAggregator.aggregate(entry.getValue().getExecutionStats());
+ if (executionStatsAggregator != null) {
+ for (Map.Entry<String, OperatorStats> entry : transferableBlock.getResultMetadata().entrySet()) {
+ LOGGER.info("Broker Query Execution Stats - OperatorId: {}, OperatorStats: {}", entry.getKey(),
+ OperatorUtils.operatorStatsToJson(entry.getValue()));
+ executionStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+ }
}
- metadata.putAll(statsAggregator.getStats());
return resultDataBlocks;
}
resultDataBlocks.add(transferableBlock.getDataBlock());
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 6ab4bc3a3e..1ed74db296 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -163,14 +163,14 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
@Test(dataProvider = "testDataWithSqlToFinalRowCount")
public void testSqlWithFinalRowCountChecker(String sql, int expectedRows)
throws Exception {
- List<Object[]> resultRows = queryRunner(sql);
+ List<Object[]> resultRows = queryRunner(sql, null);
Assert.assertEquals(resultRows.size(), expectedRows);
}
@Test(dataProvider = "testSql")
public void testSqlWithH2Checker(String sql)
throws Exception {
- List<Object[]> resultRows = queryRunner(sql);
+ List<Object[]> resultRows = queryRunner(sql, null);
// query H2 for data
List<Object[]> expectedRows = queryH2(sql);
compareRowEquals(resultRows, expectedRows);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 412b622c5d..6ebc8cdb1c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
+import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryServerEnclosure;
@@ -81,7 +82,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
// --------------------------------------------------------------------------
// QUERY UTILS
// --------------------------------------------------------------------------
- protected List<Object[]> queryRunner(String sql) {
+ protected List<Object[]> queryRunner(String sql, ExecutionStatsAggregator executionStatsAggregator) {
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
Map<String, String> requestMetadataMap =
ImmutableMap.of(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()),
@@ -107,7 +108,8 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
}
Preconditions.checkNotNull(mailboxReceiveOperator);
return QueryDispatcher.toResultTable(
- QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS),
+ QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS,
+ executionStatsAggregator),
queryPlan.getQueryResultFields(), queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 0bda46470e..6cd0127b76 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.spi.data.FieldSpec;
@@ -49,9 +50,13 @@ public class AggregateOperatorTest {
@Mock
private MultiStageOperator _input;
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
}
@AfterMethod
@@ -71,7 +76,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT});
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
// When:
TransferableBlock block1 = operator.nextBlock(); // build
@@ -91,7 +96,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT});
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
// When:
TransferableBlock block = operator.nextBlock();
@@ -113,7 +118,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
// When:
TransferableBlock block1 = operator.nextBlock(); // build when reading NoOp block
@@ -136,7 +141,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -161,7 +166,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
// When:
TransferableBlock block1 = operator.nextBlock();
@@ -193,7 +198,8 @@ public class AggregateOperatorTest {
Mockito.when(merger.initialize(Mockito.any(), Mockito.any())).thenReturn(1d);
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
AggregateOperator operator =
- new AggregateOperator(_input, outSchema, calls, group, inSchema, ImmutableMap.of("SUM", cdt -> merger), 1, 2);
+ new AggregateOperator(_input, outSchema, calls, group, inSchema, ImmutableMap.of("SUM", cdt -> merger), 1, 2,
+ _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock(); // (output result)
@@ -215,7 +221,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT});
AggregateOperator sum0GroupBy1 =
new AggregateOperator(upstreamOperator, OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1),
- Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), inSchema, 1, 2);
+ Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), inSchema, 1, 2, _serverAddress);
TransferableBlock result = sum0GroupBy1.getNextBlock();
while (result.isNoOpBlock()) {
result = sum0GroupBy1.getNextBlock();
@@ -238,7 +244,7 @@ public class AggregateOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new ColumnDataType[]{DOUBLE});
// When:
- AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
}
@Test
@@ -255,7 +261,7 @@ public class AggregateOperatorTest {
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
- AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2);
+ AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress);
// When:
TransferableBlock block = operator.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index cf5cb80151..26e40cc4ab 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.spi.data.FieldSpec;
@@ -42,9 +43,13 @@ public class FilterOperatorTest {
@Mock
private MultiStageOperator _upstreamOperator;
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
}
@AfterMethod
@@ -61,7 +66,7 @@ public class FilterOperatorTest {
DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.BOOLEAN
});
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
TransferableBlock errorBlock = op.getNextBlock();
Assert.assertTrue(errorBlock.isErrorBlock());
DataBlock error = errorBlock.getDataBlock();
@@ -76,7 +81,7 @@ public class FilterOperatorTest {
DataSchema.ColumnDataType.INT
});
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertTrue(dataBlock.isEndOfStreamBlock());
}
@@ -89,7 +94,7 @@ public class FilterOperatorTest {
DataSchema.ColumnDataType.INT
});
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertTrue(dataBlock.isNoOpBlock());
}
@@ -104,7 +109,7 @@ public class FilterOperatorTest {
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -122,7 +127,7 @@ public class FilterOperatorTest {
});
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -137,7 +142,7 @@ public class FilterOperatorTest {
});
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress);
TransferableBlock errorBlock = op.getNextBlock();
Assert.assertTrue(errorBlock.isErrorBlock());
DataBlock data = errorBlock.getDataBlock();
@@ -152,7 +157,7 @@ public class FilterOperatorTest {
});
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2}));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0, 1, 2, _serverAddress);
TransferableBlock errorBlock = op.getNextBlock();
Assert.assertTrue(errorBlock.isErrorBlock());
DataBlock data = errorBlock.getDataBlock();
@@ -167,7 +172,7 @@ public class FilterOperatorTest {
});
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, new Object[]{2, false}));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -187,7 +192,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall andCall = new RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND",
ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -207,7 +212,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall orCall = new RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR",
ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -229,7 +234,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall notCall = new RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT",
ImmutableList.of(new RexExpression.InputRef(0)));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -248,7 +253,7 @@ public class FilterOperatorTest {
RexExpression.FunctionCall greaterThan =
new RexExpression.FunctionCall(SqlKind.GREATER_THAN, FieldSpec.DataType.BOOLEAN, "greaterThan",
ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1)));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -268,7 +273,7 @@ public class FilterOperatorTest {
new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWith",
ImmutableList.of(new RexExpression.InputRef(0),
new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2, _serverAddress);
TransferableBlock dataBlock = op.getNextBlock();
Assert.assertFalse(dataBlock.isErrorBlock());
List<Object[]> result = dataBlock.getContainer();
@@ -289,6 +294,6 @@ public class FilterOperatorTest {
new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWithError",
ImmutableList.of(new RexExpression.InputRef(0),
new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
- FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2);
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2, _serverAddress);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index b9a95ba9a0..c77107a58c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.spi.data.FieldSpec;
@@ -51,9 +52,13 @@ public class HashJoinOperatorTest {
@Mock
private MultiStageOperator _rightOperator;
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
}
@AfterMethod
@@ -90,7 +95,8 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
- HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator joinOnString =
+ new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = joinOnString.nextBlock();
while (result.isNoOpBlock()) {
@@ -127,7 +133,8 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator joinOnInt =
+ new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = joinOnInt.nextBlock();
while (result.isNoOpBlock()) {
result = joinOnInt.nextBlock();
@@ -161,7 +168,8 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
- HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator joinOnInt =
+ new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = joinOnInt.nextBlock();
while (result.isNoOpBlock()) {
result = joinOnInt.nextBlock();
@@ -202,7 +210,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.LEFT,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -236,7 +244,7 @@ public class HashJoinOperatorTest {
List<RexExpression> joinClauses = new ArrayList<>();
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -267,7 +275,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.LEFT,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -299,9 +307,10 @@ public class HashJoinOperatorTest {
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING
});
+
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -339,7 +348,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
@@ -377,7 +386,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
@@ -411,7 +420,8 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.RIGHT,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator joinOnNum =
+ new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = joinOnNum.nextBlock();
while (result.isNoOpBlock()) {
result = joinOnNum.nextBlock();
@@ -460,14 +470,14 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.SEMI,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
}
List<Object[]> resultRows = result.getContainer();
- List<Object[]> expectedRows = ImmutableList.of(new Object[]{1, "Aa", null, null},
- new Object[]{2, "BB", null, null});
+ List<Object[]> expectedRows =
+ ImmutableList.of(new Object[]{1, "Aa", null, null}, new Object[]{2, "BB", null, null});
Assert.assertEquals(resultRows.size(), expectedRows.size());
Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
@@ -499,7 +509,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.FULL,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
@@ -551,7 +561,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.ANTI,
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
result = join.nextBlock();
@@ -589,7 +599,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -622,7 +632,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock();
while (result.isNoOpBlock()) {
@@ -658,7 +668,7 @@ public class HashJoinOperatorTest {
});
JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER,
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
- HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2);
+ HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress);
TransferableBlock result = join.nextBlock(); // first no-op consumes first right data block.
Assert.assertTrue(result.isNoOpBlock());
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
index 185a6d5d53..a8332f22d2 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
@@ -34,8 +34,14 @@ import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunct
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -43,6 +49,22 @@ import static org.mockito.Mockito.mock;
// TODO: add tests for Agg / GroupBy / Distinct result blocks
public class LeafStageTransferableBlockOperatorTest {
+ private AutoCloseable _mocks;
+
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
@Test
public void shouldReturnDataBlockThenMetadataBlock() {
@@ -53,7 +75,7 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -79,7 +101,7 @@ public class LeafStageTransferableBlockOperatorTest {
new SelectionResultsBlock(resultSchema,
Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, desiredSchema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, desiredSchema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -101,7 +123,7 @@ public class LeafStageTransferableBlockOperatorTest {
new SelectionResultsBlock(schema,
Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -126,7 +148,7 @@ public class LeafStageTransferableBlockOperatorTest {
queryContext),
new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock1 = operator.nextBlock();
@@ -156,7 +178,7 @@ public class LeafStageTransferableBlockOperatorTest {
errorBlock,
new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -177,7 +199,7 @@ public class LeafStageTransferableBlockOperatorTest {
new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema,
Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -198,7 +220,7 @@ public class LeafStageTransferableBlockOperatorTest {
new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema,
Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new Object[]{"bar", 2})))), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -222,7 +244,7 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -245,7 +267,7 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -264,7 +286,7 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
new AggregationResultsBlock(queryContext.getAggregationFunctions(), Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
+ new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -286,7 +308,7 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
new InstanceResponseBlock(new SelectionResultsBlock(resultSchema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2);
+ new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
@@ -309,7 +331,7 @@ public class LeafStageTransferableBlockOperatorTest {
new DistinctResultsBlock(mock(DistinctAggregationFunction.class),
new DistinctTable(resultSchema, Collections.emptyList())), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2);
+ new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
@@ -331,7 +353,7 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(resultSchema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2);
+ new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
index 856965cfb4..749431e090 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
@@ -23,10 +23,12 @@ import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.plan.PlanRequestContext;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -41,9 +43,13 @@ public class LiteralValueOperatorTest {
@Mock
private PlanRequestContext _context;
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
}
@AfterMethod
@@ -58,14 +64,9 @@ public class LiteralValueOperatorTest {
DataSchema schema = new DataSchema(new String[]{"sLiteral", "iLiteral"},
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
List<List<RexExpression>> literals = ImmutableList.of(
- ImmutableList.of(
- new RexExpression.Literal(DataType.STRING, "foo"),
- new RexExpression.Literal(DataType.INT, 1)),
- ImmutableList.of(
- new RexExpression.Literal(DataType.STRING, ""),
- new RexExpression.Literal(DataType.INT, 2))
- );
- LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2);
+ ImmutableList.of(new RexExpression.Literal(DataType.STRING, "foo"), new RexExpression.Literal(DataType.INT, 1)),
+ ImmutableList.of(new RexExpression.Literal(DataType.STRING, ""), new RexExpression.Literal(DataType.INT, 2)));
+ LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2, _serverAddress);
// When:
TransferableBlock transferableBlock = operator.nextBlock();
@@ -81,7 +82,7 @@ public class LiteralValueOperatorTest {
// Given:
DataSchema schema = new DataSchema(new String[]{}, new ColumnDataType[]{});
List<List<RexExpression>> literals = ImmutableList.of(ImmutableList.of());
- LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2);
+ LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2, _serverAddress);
// When:
TransferableBlock transferableBlock = operator.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 71f23ede1d..b916b5cc5e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
@@ -39,6 +40,7 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+
public class MailboxSendOperatorTest {
private static final int DEFAULT_SENDER_STAGE_ID = 0;
@@ -64,6 +66,10 @@ public class MailboxSendOperatorTest {
_mocks = MockitoAnnotations.openMocks(this);
Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(_exchange);
+
+ Mockito.when(_server.getHostname()).thenReturn("mock");
+ Mockito.when(_server.getQueryMailboxPort()).thenReturn(0);
+ Mockito.when(_server.getVirtualId()).thenReturn(0);
}
@AfterMethod
@@ -75,12 +81,12 @@ public class MailboxSendOperatorTest {
@Test
public void shouldSwallowNoOpBlockFromUpstream() {
// Given:
- MailboxSendOperator operator = new MailboxSendOperator(
- _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+ RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
- DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
- Mockito.when(_input.nextBlock())
- .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+ DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+ new VirtualServerAddress(_server));
+ Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
// When:
TransferableBlock block = operator.nextBlock();
@@ -93,13 +99,13 @@ public class MailboxSendOperatorTest {
@Test
public void shouldSendErrorBlock() {
// Given:
- MailboxSendOperator operator = new MailboxSendOperator(
- _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+ RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
- DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+ DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+ new VirtualServerAddress(_server));
TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
- Mockito.when(_input.nextBlock())
- .thenReturn(errorBlock);
+ Mockito.when(_input.nextBlock()).thenReturn(errorBlock);
// When:
TransferableBlock block = operator.nextBlock();
@@ -112,12 +118,12 @@ public class MailboxSendOperatorTest {
@Test
public void shouldSendErrorBlockWhenInputThrows() {
// Given:
- MailboxSendOperator operator = new MailboxSendOperator(
- _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+ RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
- DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
- Mockito.when(_input.nextBlock())
- .thenThrow(new RuntimeException("foo!"));
+ DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+ new VirtualServerAddress(_server));
+ Mockito.when(_input.nextBlock()).thenThrow(new RuntimeException("foo!"));
ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
// When:
@@ -132,13 +138,13 @@ public class MailboxSendOperatorTest {
@Test
public void shouldSendEosBlock() {
// Given:
- MailboxSendOperator operator = new MailboxSendOperator(
- _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+ RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
- DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+ DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+ new VirtualServerAddress(_server));
TransferableBlock eosBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock();
- Mockito.when(_input.nextBlock())
- .thenReturn(eosBlock);
+ Mockito.when(_input.nextBlock()).thenReturn(eosBlock);
// When:
TransferableBlock block = operator.nextBlock();
@@ -151,13 +157,13 @@ public class MailboxSendOperatorTest {
@Test
public void shouldSendDataBlock() {
// Given:
- MailboxSendOperator operator = new MailboxSendOperator(
- _mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
+ RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
- DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+ DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
+ new VirtualServerAddress(_server));
TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{}));
- Mockito.when(_input.nextBlock())
- .thenReturn(dataBlock)
+ Mockito.when(_input.nextBlock()).thenReturn(dataBlock)
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
// When:
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index a7053c226c..04c3b8c700 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.mockito.Mock;
@@ -47,9 +48,13 @@ public class SortOperatorTest {
@Mock
private MultiStageOperator _input;
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
}
@AfterMethod
@@ -64,7 +69,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")));
@@ -82,7 +87,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
@@ -99,7 +104,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -116,10 +121,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
@@ -139,10 +143,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new DataSchema.ColumnDataType[]{INT, INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
@@ -162,10 +165,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{STRING});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
@@ -185,10 +187,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.DESCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
@@ -208,10 +209,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 1, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 1, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
@@ -231,10 +231,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 1, 1, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 1, 1, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
@@ -253,10 +252,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 0, 0, schema, 1, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 0, 0, schema, 1, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
@@ -275,10 +273,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, -1, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, -1, 0, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
@@ -296,10 +293,9 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{2}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
.thenReturn(block(schema, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -320,7 +316,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0, 1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -344,7 +340,7 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0, 1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING);
DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -368,12 +364,10 @@ public class SortOperatorTest {
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2);
+ SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress);
- Mockito.when(_input.nextBlock())
- .thenReturn(block(schema, new Object[]{2}))
- .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
- .thenReturn(block(schema, new Object[]{1}))
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
+ .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
// When:
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
index 52ffff08fb..f990882f74 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.spi.data.FieldSpec;
@@ -47,9 +48,13 @@ public class TransformOperatorTest {
@Mock
private MultiStageOperator _upstreamOp;
+ @Mock
+ private VirtualServerAddress _serverAddress;
+
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString());
}
@AfterMethod
@@ -71,7 +76,8 @@ public class TransformOperatorTest {
RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
RexExpression.InputRef ref1 = new RexExpression.InputRef(1);
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema, 1, 2);
+ new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema, 1, 2,
+ _serverAddress);
TransferableBlock result = op.nextBlock();
Assert.assertTrue(!result.isErrorBlock());
@@ -96,7 +102,7 @@ public class TransformOperatorTest {
RexExpression.Literal strLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "str");
TransformOperator op =
new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
- 2);
+ 2, _serverAddress);
TransferableBlock result = op.nextBlock();
// Literal operands should just output original literals.
Assert.assertTrue(!result.isErrorBlock());
@@ -126,7 +132,8 @@ public class TransformOperatorTest {
DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE});
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2);
+ new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2,
+ _serverAddress);
TransferableBlock result = op.nextBlock();
Assert.assertTrue(!result.isErrorBlock());
List<Object[]> resultRows = result.getContainer();
@@ -154,7 +161,8 @@ public class TransformOperatorTest {
DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE});
TransformOperator op =
- new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2);
+ new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2,
+ _serverAddress);
TransferableBlock result = op.nextBlock();
Assert.assertTrue(result.isErrorBlock());
@@ -175,7 +183,7 @@ public class TransformOperatorTest {
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
TransformOperator op =
new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
- 2);
+ 2, _serverAddress);
TransferableBlock result = op.nextBlock();
Assert.assertTrue(result.isErrorBlock());
DataBlock data = result.getDataBlock();
@@ -199,7 +207,7 @@ public class TransformOperatorTest {
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.STRING});
TransformOperator op =
new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
- 2);
+ 2, _serverAddress);
TransferableBlock result = op.nextBlock();
// First block has two rows
Assert.assertFalse(result.isErrorBlock());
@@ -231,7 +239,7 @@ public class TransformOperatorTest {
DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
});
TransformOperator transform =
- new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), upStreamSchema, 1, 2);
+ new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), upStreamSchema, 1, 2, _serverAddress);
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*doesn't match "
@@ -244,6 +252,6 @@ public class TransformOperatorTest {
});
RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
TransformOperator transform =
- new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0), upStreamSchema, 1, 2);
+ new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0), upStreamSchema, 1, 2, _serverAddress);
}
};
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index b0a09f3f71..d311e9f467 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -33,11 +33,14 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
@@ -65,6 +68,8 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
private static final Random RANDOM = new Random(42);
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
+ private static Map<String, Set<String>> _tableToSegmentMap = new HashMap<>();
+
@BeforeClass
public void setUp()
throws Exception {
@@ -158,10 +163,26 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
Map<String, Object> reducerConfig = new HashMap<>();
reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, _reducerGrpcPort);
reducerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, _reducerHostname);
- _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerGrpcPort, new PinotConfiguration(reducerConfig),
- ignored -> { });
+ _mailboxService =
+ new GrpcMailboxService(_reducerHostname, _reducerGrpcPort, new PinotConfiguration(reducerConfig), ignored -> {
+ });
_mailboxService.start();
+ Map<String, List<String>> tableToSegmentMap1 = factory1.buildTableSegmentNameMap();
+ Map<String, List<String>> tableToSegmentMap2 = factory2.buildTableSegmentNameMap();
+
+ for (Map.Entry<String, List<String>> entry : tableToSegmentMap1.entrySet()) {
+ _tableToSegmentMap.put(entry.getKey(), new HashSet<>(entry.getValue()));
+ }
+
+ for (Map.Entry<String, List<String>> entry : tableToSegmentMap2.entrySet()) {
+ if (_tableToSegmentMap.containsKey(entry.getKey())) {
+ _tableToSegmentMap.get(entry.getKey()).addAll(entry.getValue());
+ } else {
+ _tableToSegmentMap.put(entry.getKey(), new HashSet<>(entry.getValue()));
+ }
+ }
+
_queryEnvironment =
QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, server1.getPort(), server2.getPort(),
factory1.buildSchemaMap(), factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap());
@@ -189,7 +210,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
public void testQueryTestCasesWithH2(String testCaseName, String sql, String expect)
throws Exception {
// query pinot
- runQuery(sql, expect).ifPresent(rows -> {
+ runQuery(sql, expect, null).ifPresent(rows -> {
try {
compareRowEquals(rows, queryH2(sql));
} catch (Exception e) {
@@ -201,13 +222,25 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
@Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
public void testQueryTestCasesWithOutput(String testCaseName, String sql, List<Object[]> expectedRows, String expect)
throws Exception {
- runQuery(sql, expect).ifPresent(rows -> compareRowEquals(rows, expectedRows));
+ runQuery(sql, expect, null).ifPresent(rows -> compareRowEquals(rows, expectedRows));
+ }
+
+ @Test(dataProvider = "testResourceQueryTestCaseProviderWithMetadata")
+ public void testQueryTestCasesWithMetadata(String testCaseName, String sql, String expect, int numSegments)
+ throws Exception {
+ ExecutionStatsAggregator executionStatsAggregator = new ExecutionStatsAggregator(false);
+ runQuery(sql, expect, executionStatsAggregator).ifPresent(rows -> {
+ BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
+ executionStatsAggregator.setStats(brokerResponseNative);
+ Assert.assertEquals(brokerResponseNative.getNumSegmentsQueried(), numSegments);
+ });
}
- private Optional<List<Object[]>> runQuery(String sql, final String except) {
+ private Optional<List<Object[]>> runQuery(String sql, final String except,
+ ExecutionStatsAggregator executionStatsAggregator) {
try {
// query pinot
- List<Object[]> resultRows = queryRunner(sql);
+ List<Object[]> resultRows = queryRunner(sql, executionStatsAggregator);
Assert.assertNull(except,
"Expected error with message '" + except + "'. But instead rows were returned: " + resultRows.stream()
@@ -252,6 +285,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
for (List<Object> objs : orgRows) {
expectedRows.add(objs.toArray());
}
+
Object[] testEntry = new Object[]{testCaseName, sql, expectedRows, queryCase._expectedException};
providerContent.add(testEntry);
}
@@ -260,6 +294,48 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
return providerContent.toArray(new Object[][]{});
}
+ @DataProvider
+ private static Object[][] testResourceQueryTestCaseProviderWithMetadata()
+ throws Exception {
+ Map<String, QueryTestCase> testCaseMap = getTestCases();
+ List<Object[]> providerContent = new ArrayList<>();
+ for (Map.Entry<String, QueryTestCase> testCaseEntry : testCaseMap.entrySet()) {
+ String testCaseName = testCaseEntry.getKey();
+ if (testCaseEntry.getValue()._ignored) {
+ continue;
+ }
+
+ List<QueryTestCase.Query> queryCases = testCaseEntry.getValue()._queries;
+ for (QueryTestCase.Query queryCase : queryCases) {
+ if (queryCase._ignored) {
+ continue;
+ }
+
+ if (queryCase._outputs != null) {
+ String sql = replaceTableName(testCaseName, queryCase._sql);
+ if (!sql.contains("basic_test")) {
+ continue;
+ }
+
+ List<List<Object>> orgRows = queryCase._outputs;
+ List<Object[]> expectedRows = new ArrayList<>(orgRows.size());
+ for (List<Object> objs : orgRows) {
+ expectedRows.add(objs.toArray());
+ }
+ int segmentCount = 0;
+ for (String tableName : testCaseEntry.getValue()._tables.keySet()) {
+ segmentCount +=
+ _tableToSegmentMap.getOrDefault(testCaseName + "_" + tableName + "_OFFLINE", new HashSet<>()).size();
+ }
+
+ Object[] testEntry = new Object[]{testCaseName, sql, queryCase._expectedException, segmentCount};
+ providerContent.add(testEntry);
+ }
+ }
+ }
+ return providerContent.toArray(new Object[][]{});
+ }
+
@DataProvider
private static Object[][] testResourceQueryTestCaseProviderInputOnly()
throws Exception {
@@ -319,8 +395,9 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
URL testFileUrl = classLoader.getResource(testCaseFile);
// This test only supports local resource loading (e.g. must be a file), not support JAR test loading.
if (testFileUrl != null && new File(testFileUrl.getFile()).exists()) {
- Map<String, QueryTestCase> testCases = MAPPER.readValue(new File(testFileUrl.getFile()),
- new TypeReference<Map<String, QueryTestCase>>() { });
+ Map<String, QueryTestCase> testCases =
+ MAPPER.readValue(new File(testFileUrl.getFile()), new TypeReference<Map<String, QueryTestCase>>() {
+ });
{
HashSet<String> hashSet = new HashSet<>(testCaseMap.keySet());
hashSet.retainAll(testCases.keySet());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org