You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/11 02:36:31 UTC
[pinot] branch master updated: GRPC broker request handler (#7838)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 12edbdb GRPC broker request handler (#7838)
12edbdb is described below
commit 12edbdb1e521e0d9ee35a2d6ef2fed172e5beaa1
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Fri Dec 10 18:36:05 2021 -0800
GRPC broker request handler (#7838)
Adding a framework of GRPCBrokerResponseHandler parallel to the SingleConnectionBrokerRequestHandler
This handler handles data streaming back from server and process reduce in a streaming fashion.
---
.../broker/broker/helix/BaseBrokerStarter.java | 22 +-
.../requesthandler/BaseBrokerRequestHandler.java | 4 -
.../requesthandler/GrpcBrokerRequestHandler.java | 156 +++++++++++
.../SingleConnectionBrokerRequestHandler.java | 4 +
.../pinot/core/query/reduce/BaseReduceService.java | 305 +++++++++++++++++++++
.../core/query/reduce/BrokerReduceService.java | 244 +----------------
.../core/query/reduce/ResultReducerFactory.java | 10 +
.../reduce/SelectionOnlyStreamingReducer.java | 98 +++++++
.../core/query/reduce/StreamingReduceService.java | 153 +++++++++++
.../pinot/core/query/reduce/StreamingReducer.java | 33 +++
.../pinot/core/transport/ServerInstance.java | 36 +++
.../pinot/integration/tests/ClusterTest.java | 29 +-
.../tests/GrpcBrokerClusterIntegrationTest.java | 131 +++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 5 +
14 files changed, 980 insertions(+), 250 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 527719a..b7e4c2d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -41,6 +41,7 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.common.Utils;
@@ -235,14 +236,23 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
// Configure TLS for netty connection to server
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
- if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
+ if (_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE)
+ .equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
+ LOGGER.info("Starting Grpc BrokerRequestHandler.");
_brokerRequestHandler =
- new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
- queryQuotaManager, tableCache, _brokerMetrics, tlsDefaults);
- } else {
- _brokerRequestHandler =
- new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+ new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, null);
+ } else { // default request handler type, e.g. netty
+ LOGGER.info("Starting Netty BrokerRequestHandler.");
+ if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
+ _brokerRequestHandler =
+ new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+ queryQuotaManager, tableCache, _brokerMetrics, tlsDefaults);
+ } else {
+ _brokerRequestHandler =
+ new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+ queryQuotaManager, tableCache, _brokerMetrics, null);
+ }
}
LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 8cbd52e..bb8f475 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -78,7 +78,6 @@ import org.apache.pinot.common.utils.helix.TableCache;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
-import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.requesthandler.PinotQueryParserFactory;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.QueryOptionsUtils;
@@ -117,7 +116,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final AtomicLong _requestIdGenerator = new AtomicLong();
protected final QueryOptimizer _queryOptimizer = new QueryOptimizer();
- protected final BrokerReduceService _brokerReduceService;
protected final String _brokerId;
protected final long _brokerTimeoutMs;
@@ -159,8 +157,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
_numDroppedLog = new AtomicInteger(0);
_numDroppedLogRateLimiter = RateLimiter.create(1.0);
-
- _brokerReduceService = new BrokerReduceService(_config);
LOGGER.info(
"Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
_brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
new file mode 100644
index 0000000..7d0722e
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.broker.api.RequestStatistics;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.RoutingManager;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
+import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
+import org.apache.pinot.common.utils.helix.TableCache;
+import org.apache.pinot.core.query.reduce.StreamingReduceService;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.transport.TlsConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * The <code>GrpcBrokerRequestHandler</code> class communicates query request via GRPC.
+ */
+@ThreadSafe
+public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
+
+ private final GrpcQueryClient.Config _grpcConfig;
+ private final StreamingReduceService _streamingReduceService;
+ private final PinotStreamingQueryClient _streamingQueryClient;
+
+ public GrpcBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager,
+ AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
+ BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
+ super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+ _grpcConfig = buildGrpcQueryClientConfig(config);
+
+ // create streaming query client
+ _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig);
+
+ // create streaming reduce service
+ _streamingReduceService = new StreamingReduceService(config);
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public synchronized void shutDown() {
+ _streamingReduceService.shutDown();
+ }
+
+ @Override
+ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
+ @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable,
+ long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics)
+ throws Exception {
+ assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
+
+ String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName());
+ Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>();
+ // Making request to a streaming server response.
+
+ if (offlineBrokerRequest != null) {
+ // to offline servers.
+ assert offlineRoutingTable != null;
+ streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.OFFLINE, responseMap,
+ offlineBrokerRequest, offlineRoutingTable, timeoutMs, true, 1);
+ }
+ if (realtimeBrokerRequest != null) {
+ // to realtime servers.
+ assert realtimeRoutingTable != null;
+ streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.REALTIME, responseMap,
+ realtimeBrokerRequest, realtimeRoutingTable, timeoutMs, true, 1);
+ }
+ BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(
+ originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics);
+ return brokerResponse;
+ }
+
+ /**
+ * Query pinot server for data table.
+ */
+ public void streamingQueryToPinotServer(final long requestId, final String brokerHost, final String rawTableName,
+ final TableType tableType, Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap,
+ BrokerRequest brokerRequest, Map<ServerInstance, List<String>> routingTable, long connectionTimeoutInMillis,
+ boolean ignoreEmptyResponses, int pinotRetryCount) {
+ // Retries will all hit the same server because the routing decision has already been made by the pinot broker
+ Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap = new HashMap<>();
+ for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) {
+ ServerInstance serverInstance = routingEntry.getKey();
+ List<String> segments = routingEntry.getValue();
+ String serverHost = serverInstance.getHostname();
+ int port = serverInstance.getGrpcPort();
+ // TODO: enable throttling on per host bases.
+ Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port,
+ new GrpcRequestBuilder()
+ .setSegments(segments)
+ .setBrokerRequest(brokerRequest)
+ .setEnableStreaming(true));
+ responseMap.put(serverInstance.toServerRoutingInstance(tableType), streamingResponse);
+ }
+ }
+
+ // return empty config for now
+ private GrpcQueryClient.Config buildGrpcQueryClientConfig(PinotConfiguration config) {
+ return new GrpcQueryClient.Config();
+ }
+
+ public static class PinotStreamingQueryClient {
+ private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new ConcurrentHashMap<>();
+ private final GrpcQueryClient.Config _config;
+
+ public PinotStreamingQueryClient(GrpcQueryClient.Config config) {
+ _config = config;
+ }
+
+ public Iterator<Server.ServerResponse> submit(String host, int port, GrpcRequestBuilder requestBuilder) {
+ GrpcQueryClient client = getOrCreateGrpcQueryClient(host, port);
+ return client.submit(requestBuilder.build());
+ }
+
+ private GrpcQueryClient getOrCreateGrpcQueryClient(String host, int port) {
+ String key = String.format("%s_%d", host, port);
+ return _grpcQueryClientMap.computeIfAbsent(key, k -> new GrpcQueryClient(host, port, _config));
+ }
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 76deaa5..f585df72 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.helix.TableCache;
+import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
@@ -55,12 +56,15 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
*/
@ThreadSafe
public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandler {
+ private final BrokerReduceService _brokerReduceService;
private final QueryRouter _queryRouter;
public SingleConnectionBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+
+ _brokerReduceService = new BrokerReduceService(_config);
_queryRouter = new QueryRouter(_brokerId, brokerMetrics, tlsConfig);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
new file mode 100644
index 0000000..6a6d1c6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is the base reduce service.
+ */
+@ThreadSafe
+public abstract class BaseReduceService {
+
+ // Set the reducer priority higher than NORM but lower than MAX, because if a query is complete
+ // we want to deserialize and return response as soon. This is the same as server side 'pqr' threads.
+ protected static final int QUERY_RUNNER_THREAD_PRIORITY = 7;
+ // brw -> Shorthand for broker reduce worker threads.
+ protected static final String REDUCE_THREAD_NAME_FORMAT = "brw-%d";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseReduceService.class);
+
+ protected final ExecutorService _reduceExecutorService;
+ protected final int _maxReduceThreadsPerQuery;
+ protected final int _groupByTrimThreshold;
+
+ public BaseReduceService(PinotConfiguration config) {
+ _maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
+ CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);
+ _groupByTrimThreshold = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD,
+ CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
+
+ int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors();
+ LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.",
+ numThreadsInExecutorService, _maxReduceThreadsPerQuery);
+
+ ThreadFactory reduceThreadFactory =
+ new ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY)
+ .setNameFormat(REDUCE_THREAD_NAME_FORMAT).build();
+
+ // ExecutorService is initialized with numThreads same as availableProcessors.
+ _reduceExecutorService = Executors.newFixedThreadPool(numThreadsInExecutorService, reduceThreadFactory);
+ }
+
+ protected static void updateAlias(QueryContext queryContext, BrokerResponseNative brokerResponseNative) {
+ ResultTable resultTable = brokerResponseNative.getResultTable();
+ if (resultTable == null) {
+ return;
+ }
+ List<String> aliasList = queryContext.getAliasList();
+ if (aliasList.isEmpty()) {
+ return;
+ }
+
+ String[] columnNames = resultTable.getDataSchema().getColumnNames();
+ List<ExpressionContext> selectExpressions = getSelectExpressions(queryContext.getSelectExpressions());
+ int numSelectExpressions = selectExpressions.size();
+ // For query like `SELECT *`, we skip alias update.
+ if (columnNames.length != numSelectExpressions) {
+ return;
+ }
+ for (int i = 0; i < numSelectExpressions; i++) {
+ String alias = aliasList.get(i);
+ if (alias != null) {
+ columnNames[i] = alias;
+ }
+ }
+ }
+
+ protected static List<ExpressionContext> getSelectExpressions(List<ExpressionContext> selectExpressions) {
+ // NOTE: For DISTINCT queries, need to extract the arguments as the SELECT expressions
+ if (selectExpressions.size() == 1 && selectExpressions.get(0).getType() == ExpressionContext.Type.FUNCTION
+ && selectExpressions.get(0).getFunction().getFunctionName().equals("distinct")) {
+ return selectExpressions.get(0).getFunction().getArguments();
+ }
+ return selectExpressions;
+ }
+
+ protected void shutDown() {
+ _reduceExecutorService.shutdownNow();
+ }
+
+ protected static class ExecutionStatsAggregator {
+ private final List<QueryProcessingException> _processingExceptions = new ArrayList<>();
+ private final Map<String, String> _traceInfo = new HashMap<>();
+ private final boolean _enableTrace;
+
+ private long _numDocsScanned = 0L;
+ private long _numEntriesScannedInFilter = 0L;
+ private long _numEntriesScannedPostFilter = 0L;
+ private long _numSegmentsQueried = 0L;
+ private long _numSegmentsProcessed = 0L;
+ private long _numSegmentsMatched = 0L;
+ private long _numConsumingSegmentsProcessed = 0L;
+ private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+ private long _numTotalDocs = 0L;
+ private long _offlineThreadCpuTimeNs = 0L;
+ private long _realtimeThreadCpuTimeNs = 0L;
+ private long _offlineSystemActivitiesCpuTimeNs = 0L;
+ private long _realtimeSystemActivitiesCpuTimeNs = 0L;
+ private long _offlineResponseSerializationCpuTimeNs = 0L;
+ private long _realtimeResponseSerializationCpuTimeNs = 0L;
+ private long _offlineTotalCpuTimeNs = 0L;
+ private long _realtimeTotalCpuTimeNs = 0L;
+ private boolean _numGroupsLimitReached = false;
+
+ protected ExecutionStatsAggregator(boolean enableTrace) {
+ _enableTrace = enableTrace;
+ }
+
+ protected synchronized void aggregate(ServerRoutingInstance routingInstance, DataTable dataTable) {
+ Map<String, String> metadata = dataTable.getMetadata();
+ // Reduce on trace info.
+ if (_enableTrace) {
+ _traceInfo.put(routingInstance.getHostname(), metadata.get(MetadataKey.TRACE_INFO.getName()));
+ }
+
+ // Reduce on exceptions.
+ Map<Integer, String> exceptions = dataTable.getExceptions();
+ for (int key : exceptions.keySet()) {
+ _processingExceptions.add(new QueryProcessingException(key, exceptions.get(key)));
+ }
+
+ // Reduce on execution statistics.
+ String numDocsScannedString = metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName());
+ if (numDocsScannedString != null) {
+ _numDocsScanned += Long.parseLong(numDocsScannedString);
+ }
+ String numEntriesScannedInFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
+ if (numEntriesScannedInFilterString != null) {
+ _numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
+ }
+ String numEntriesScannedPostFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
+ if (numEntriesScannedPostFilterString != null) {
+ _numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
+ }
+ String numSegmentsQueriedString = metadata.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName());
+ if (numSegmentsQueriedString != null) {
+ _numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
+ }
+
+ String numSegmentsProcessedString = metadata.get(MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
+ if (numSegmentsProcessedString != null) {
+ _numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
+ }
+ String numSegmentsMatchedString = metadata.get(MetadataKey.NUM_SEGMENTS_MATCHED.getName());
+ if (numSegmentsMatchedString != null) {
+ _numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
+ }
+
+ String numConsumingString = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
+ if (numConsumingString != null) {
+ _numConsumingSegmentsProcessed += Long.parseLong(numConsumingString);
+ }
+
+ String minConsumingFreshnessTimeMsString = metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
+ if (minConsumingFreshnessTimeMsString != null) {
+ _minConsumingFreshnessTimeMs =
+ Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), _minConsumingFreshnessTimeMs);
+ }
+
+ String threadCpuTimeNsString = metadata.get(MetadataKey.THREAD_CPU_TIME_NS.getName());
+ if (threadCpuTimeNsString != null) {
+ if (routingInstance.getTableType() == TableType.OFFLINE) {
+ _offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
+ } else {
+ _realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
+ }
+ }
+
+ String systemActivitiesCpuTimeNsString = metadata.get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
+ if (systemActivitiesCpuTimeNsString != null) {
+ if (routingInstance.getTableType() == TableType.OFFLINE) {
+ _offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+ } else {
+ _realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+ }
+ }
+
+ String responseSerializationCpuTimeNsString = metadata.get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+ if (responseSerializationCpuTimeNsString != null) {
+ if (routingInstance.getTableType() == TableType.OFFLINE) {
+ _offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+ } else {
+ _realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+ }
+ }
+ _offlineTotalCpuTimeNs =
+ _offlineThreadCpuTimeNs + _offlineSystemActivitiesCpuTimeNs + _offlineResponseSerializationCpuTimeNs;
+ _realtimeTotalCpuTimeNs =
+ _realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs;
+
+ String numTotalDocsString = metadata.get(MetadataKey.TOTAL_DOCS.getName());
+ if (numTotalDocsString != null) {
+ _numTotalDocs += Long.parseLong(numTotalDocsString);
+ }
+ _numGroupsLimitReached |= Boolean.parseBoolean(metadata.get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+ }
+
+ protected void setStats(String rawTableName, BrokerResponseNative brokerResponseNative,
+ BrokerMetrics brokerMetrics) {
+ // set exception
+ List<QueryProcessingException> processingExceptions = brokerResponseNative.getProcessingExceptions();
+ processingExceptions.addAll(_processingExceptions);
+
+ // add all trace.
+ if (_enableTrace) {
+ brokerResponseNative.getTraceInfo().putAll(_traceInfo);
+ }
+
+ // Set execution statistics.
+ brokerResponseNative.setNumDocsScanned(_numDocsScanned);
+ brokerResponseNative.setNumEntriesScannedInFilter(_numEntriesScannedInFilter);
+ brokerResponseNative.setNumEntriesScannedPostFilter(_numEntriesScannedPostFilter);
+ brokerResponseNative.setNumSegmentsQueried(_numSegmentsQueried);
+ brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
+ brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
+ brokerResponseNative.setTotalDocs(_numTotalDocs);
+ brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
+ brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
+ brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
+ brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
+ brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(_realtimeSystemActivitiesCpuTimeNs);
+ brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(_offlineResponseSerializationCpuTimeNs);
+ brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(_realtimeResponseSerializationCpuTimeNs);
+ brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
+ brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
+ if (_numConsumingSegmentsProcessed > 0) {
+ brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsProcessed);
+ brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
+ }
+
+ // Update broker metrics.
+ if (brokerMetrics != null) {
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, _numDocsScanned);
+ brokerMetrics
+ .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, _numEntriesScannedInFilter);
+ brokerMetrics
+ .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, _numEntriesScannedPostFilter);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, _offlineThreadCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS,
+ _realtimeThreadCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ _offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ _realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
+ _offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
+ _realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, _offlineTotalCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+
+ if (_numConsumingSegmentsProcessed > 0 && _minConsumingFreshnessTimeMs > 0) {
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
+ System.currentTimeMillis() - _minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index fa22e54..f2796fc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -18,37 +18,22 @@
*/
package org.apache.pinot.core.query.reduce;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -56,68 +41,9 @@ import org.slf4j.LoggerFactory;
* to {@link BrokerResponseNative}.
*/
@ThreadSafe
-public class BrokerReduceService {
-
- // Set the reducer priority higher than NORM but lower than MAX, because if a query is complete
- // we want to deserialize and return response as soon. This is the same as server side 'pqr' threads.
- protected static final int QUERY_RUNNER_THREAD_PRIORITY = 7;
- private static final Logger LOGGER = LoggerFactory.getLogger(BrokerReduceService.class);
- // brw -> Shorthand for broker reduce worker threads.
- private static final String REDUCE_THREAD_NAME_FORMAT = "brw-%d";
- private final ExecutorService _reduceExecutorService;
- private final int _maxReduceThreadsPerQuery;
- private final int _groupByTrimThreshold;
-
+public class BrokerReduceService extends BaseReduceService {
public BrokerReduceService(PinotConfiguration config) {
- _maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
- CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);
- _groupByTrimThreshold = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD,
- CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
-
- int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors();
- LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.",
- numThreadsInExecutorService, _maxReduceThreadsPerQuery);
-
- ThreadFactory reduceThreadFactory =
- new ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY)
- .setNameFormat(REDUCE_THREAD_NAME_FORMAT).build();
-
- // ExecutorService is initialized with numThreads same as availableProcessors.
- _reduceExecutorService = Executors.newFixedThreadPool(numThreadsInExecutorService, reduceThreadFactory);
- }
-
- private static void updateAlias(QueryContext queryContext, BrokerResponseNative brokerResponseNative) {
- ResultTable resultTable = brokerResponseNative.getResultTable();
- if (resultTable == null) {
- return;
- }
- List<String> aliasList = queryContext.getAliasList();
- if (aliasList.isEmpty()) {
- return;
- }
-
- String[] columnNames = resultTable.getDataSchema().getColumnNames();
- List<ExpressionContext> selectExpressions = getSelectExpressions(queryContext.getSelectExpressions());
- int numSelectExpressions = selectExpressions.size();
- // For query like `SELECT *`, we skip alias update.
- if (columnNames.length != numSelectExpressions) {
- return;
- }
- for (int i = 0; i < numSelectExpressions; i++) {
- String alias = aliasList.get(i);
- if (alias != null) {
- columnNames[i] = alias;
- }
- }
- }
-
- private static List<ExpressionContext> getSelectExpressions(List<ExpressionContext> selectExpressions) {
- // NOTE: For DISTINCT queries, need to extract the arguments as the SELECT expressions
- if (selectExpressions.size() == 1 && selectExpressions.get(0).getType() == ExpressionContext.Type.FUNCTION
- && selectExpressions.get(0).getFunction().getFunctionName().equals("distinct")) {
- return selectExpressions.get(0).getFunction().getArguments();
- }
- return selectExpressions;
+ super(config);
}
public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest,
@@ -127,34 +53,15 @@ public class BrokerReduceService {
return BrokerResponseNative.empty();
}
- BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
- List<QueryProcessingException> processingExceptions = brokerResponseNative.getProcessingExceptions();
- long numDocsScanned = 0L;
- long numEntriesScannedInFilter = 0L;
- long numEntriesScannedPostFilter = 0L;
- long numSegmentsQueried = 0L;
- long numSegmentsProcessed = 0L;
- long numSegmentsMatched = 0L;
- long numConsumingSegmentsProcessed = 0L;
- long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
- long numTotalDocs = 0L;
- long offlineThreadCpuTimeNs = 0L;
- long realtimeThreadCpuTimeNs = 0L;
- long offlineSystemActivitiesCpuTimeNs = 0L;
- long realtimeSystemActivitiesCpuTimeNs = 0L;
- long offlineResponseSerializationCpuTimeNs = 0L;
- long realtimeResponseSerializationCpuTimeNs = 0L;
- long offlineTotalCpuTimeNs = 0L;
- long realtimeTotalCpuTimeNs = 0L;
-
- boolean numGroupsLimitReached = false;
-
PinotQuery pinotQuery = brokerRequest.getPinotQuery();
Map<String, String> queryOptions =
pinotQuery != null ? pinotQuery.getQueryOptions() : brokerRequest.getQueryOptions();
boolean enableTrace =
queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
+ ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
+ BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
+
// Cache a data schema from data tables (try to cache one with data rows associated with it).
DataSchema cachedDataSchema = null;
@@ -163,94 +70,9 @@ public class BrokerReduceService {
while (iterator.hasNext()) {
Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
DataTable dataTable = entry.getValue();
- Map<String, String> metadata = dataTable.getMetadata();
-
- // Reduce on trace info.
- if (enableTrace) {
- brokerResponseNative.getTraceInfo()
- .put(entry.getKey().getHostname(), metadata.get(MetadataKey.TRACE_INFO.getName()));
- }
-
- // Reduce on exceptions.
- Map<Integer, String> exceptions = dataTable.getExceptions();
- for (int key : exceptions.keySet()) {
- processingExceptions.add(new QueryProcessingException(key, exceptions.get(key)));
- }
-
- // Reduce on execution statistics.
- String numDocsScannedString = metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName());
- if (numDocsScannedString != null) {
- numDocsScanned += Long.parseLong(numDocsScannedString);
- }
- String numEntriesScannedInFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
- if (numEntriesScannedInFilterString != null) {
- numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
- }
- String numEntriesScannedPostFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
- if (numEntriesScannedPostFilterString != null) {
- numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
- }
- String numSegmentsQueriedString = metadata.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName());
- if (numSegmentsQueriedString != null) {
- numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
- }
-
- String numSegmentsProcessedString = metadata.get(MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
- if (numSegmentsProcessedString != null) {
- numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
- }
- String numSegmentsMatchedString = metadata.get(MetadataKey.NUM_SEGMENTS_MATCHED.getName());
- if (numSegmentsMatchedString != null) {
- numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
- }
-
- String numConsumingString = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
- if (numConsumingString != null) {
- numConsumingSegmentsProcessed += Long.parseLong(numConsumingString);
- }
- String minConsumingFreshnessTimeMsString = metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
- if (minConsumingFreshnessTimeMsString != null) {
- minConsumingFreshnessTimeMs =
- Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), minConsumingFreshnessTimeMs);
- }
-
- String threadCpuTimeNsString = metadata.get(MetadataKey.THREAD_CPU_TIME_NS.getName());
- if (threadCpuTimeNsString != null) {
- if (entry.getKey().getTableType() == TableType.OFFLINE) {
- offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
- } else {
- realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
- }
- }
-
- String systemActivitiesCpuTimeNsString = metadata.get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
- if (systemActivitiesCpuTimeNsString != null) {
- if (entry.getKey().getTableType() == TableType.OFFLINE) {
- offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
- } else {
- realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
- }
- }
-
- String responseSerializationCpuTimeNsString = metadata.get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
- if (responseSerializationCpuTimeNsString != null) {
- if (entry.getKey().getTableType() == TableType.OFFLINE) {
- offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
- } else {
- realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
- }
- }
- offlineTotalCpuTimeNs =
- offlineThreadCpuTimeNs + offlineSystemActivitiesCpuTimeNs + offlineResponseSerializationCpuTimeNs;
- realtimeTotalCpuTimeNs =
- realtimeThreadCpuTimeNs + realtimeSystemActivitiesCpuTimeNs + realtimeResponseSerializationCpuTimeNs;
-
- String numTotalDocsString = metadata.get(MetadataKey.TOTAL_DOCS.getName());
- if (numTotalDocsString != null) {
- numTotalDocs += Long.parseLong(numTotalDocsString);
- }
- numGroupsLimitReached |= Boolean.parseBoolean(metadata.get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+ // aggregate metrics
+ aggregator.aggregate(entry.getKey(), dataTable);
// After processing the metadata, remove data tables without data rows inside.
DataSchema dataSchema = dataTable.getDataSchema();
@@ -269,59 +91,11 @@ public class BrokerReduceService {
}
}
- // Set execution statistics.
- brokerResponseNative.setNumDocsScanned(numDocsScanned);
- brokerResponseNative.setNumEntriesScannedInFilter(numEntriesScannedInFilter);
- brokerResponseNative.setNumEntriesScannedPostFilter(numEntriesScannedPostFilter);
- brokerResponseNative.setNumSegmentsQueried(numSegmentsQueried);
- brokerResponseNative.setNumSegmentsProcessed(numSegmentsProcessed);
- brokerResponseNative.setNumSegmentsMatched(numSegmentsMatched);
- brokerResponseNative.setTotalDocs(numTotalDocs);
- brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
- brokerResponseNative.setOfflineThreadCpuTimeNs(offlineThreadCpuTimeNs);
- brokerResponseNative.setRealtimeThreadCpuTimeNs(realtimeThreadCpuTimeNs);
- brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(offlineSystemActivitiesCpuTimeNs);
- brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(realtimeSystemActivitiesCpuTimeNs);
- brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(offlineResponseSerializationCpuTimeNs);
- brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(realtimeResponseSerializationCpuTimeNs);
- brokerResponseNative.setOfflineTotalCpuTimeNs(offlineTotalCpuTimeNs);
- brokerResponseNative.setRealtimeTotalCpuTimeNs(realtimeTotalCpuTimeNs);
- if (numConsumingSegmentsProcessed > 0) {
- brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsProcessed);
- brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
- }
-
- // Update broker metrics.
String tableName = brokerRequest.getQuerySource().getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
- if (brokerMetrics != null) {
- brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, numDocsScanned);
- brokerMetrics
- .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
- brokerMetrics
- .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, offlineThreadCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS, realtimeThreadCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
- offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
- realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
- offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
- realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, offlineTotalCpuTimeNs,
- TimeUnit.NANOSECONDS);
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, realtimeTotalCpuTimeNs,
- TimeUnit.NANOSECONDS);
- if (numConsumingSegmentsProcessed > 0 && minConsumingFreshnessTimeMs > 0) {
- brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
- System.currentTimeMillis() - minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
- }
- }
+ // Set execution statistics and Update broker metrics.
+ aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
// NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the
// response with metadata only.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
index e5e9bf8..a48a007 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -22,6 +22,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -65,4 +66,13 @@ public final class ResultReducerFactory {
}
}
}
+
+ public static StreamingReducer getStreamingReducer(QueryContext queryContext) {
+ if (!QueryContextUtils.isSelectionQuery(queryContext) || queryContext.getOrderByExpressions() != null) {
+ throw new UnsupportedOperationException("Only selection queries are supported");
+ } else {
+ // Selection query
+ return new SelectionOnlyStreamingReducer(queryContext);
+ }
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
new file mode 100644
index 0000000..7fa7f84
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.QueryOptionsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SelectionOnlyStreamingReducer implements StreamingReducer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOnlyStreamingReducer.class);
+
+ private final QueryContext _queryContext;
+ private final boolean _preserveType;
+ private final int _limit;
+
+ private DataSchema _dataSchema;
+ private DataTableReducerContext _dataTableReducerContext;
+ private List<Object[]> _rows;
+
+ public SelectionOnlyStreamingReducer(QueryContext queryContext) {
+ _queryContext = queryContext;
+ _limit = _queryContext.getLimit();
+ Map<String, String> queryOptions = queryContext.getQueryOptions();
+ Preconditions.checkState(QueryOptionsUtils.isResponseFormatSQL(queryOptions), "only SQL response is supported");
+
+ _preserveType = QueryOptionsUtils.isPreserveType(queryOptions);
+ _dataSchema = null;
+ }
+
+ @Override
+ public void init(DataTableReducerContext dataTableReducerContext) {
+ _dataTableReducerContext = dataTableReducerContext;
+ _rows = new ArrayList<>(Math.min(_limit, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
+ }
+
+ @Override
+ public synchronized void reduce(ServerRoutingInstance key, DataTable dataTable) {
+ // get dataSchema
+ _dataSchema = _dataSchema == null ? dataTable.getDataSchema() : _dataSchema;
+ // TODO: For data table map with more than one data tables, remove conflicting data tables
+ reduceWithoutOrdering(dataTable, _limit);
+ }
+
+ private void reduceWithoutOrdering(DataTable dataTable, int limit) {
+ int numRows = dataTable.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ if (_rows.size() < limit) {
+ _rows.add(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId));
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public BrokerResponseNative seal() {
+ BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
+ List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, _dataSchema);
+ if (_dataSchema != null && _rows.size() > 0) {
+ brokerResponseNative.setResultTable(
+ SelectionOperatorUtils.renderResultTableWithoutOrdering(_rows, _dataSchema, selectionColumns));
+ } else {
+ // For empty data table map, construct empty result using the cached data schema for selection query
+ DataSchema selectionDataSchema = SelectionOperatorUtils.getResultTableDataSchema(_dataSchema, selectionColumns);
+ brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, Collections.emptyList()));
+ }
+ return brokerResponseNative;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
new file mode 100644
index 0000000..3a80494
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The <code>StreamingReduceService</code> class provides service to reduce grpc response gathered from multiple servers
+ * to {@link BrokerResponseNative}.
+ */
+@ThreadSafe
+public class StreamingReduceService extends BaseReduceService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamingReduceService.class);
+
+ public StreamingReduceService(PinotConfiguration config) {
+ super(config);
+ }
+
+ public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest,
+ Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs,
+ @Nullable BrokerMetrics brokerMetrics) throws IOException {
+ if (serverResponseMap.isEmpty()) {
+ // Empty response.
+ return BrokerResponseNative.empty();
+ }
+
+ // prepare contextual info for reduce.
+ PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+ Map<String, String> queryOptions =
+ pinotQuery != null ? pinotQuery.getQueryOptions() : brokerRequest.getQueryOptions();
+ boolean enableTrace =
+ queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
+
+ QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest);
+
+ String tableName = brokerRequest.getQuerySource().getTableName();
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+
+ // initialize empty response.
+ ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
+
+ // Process server response.
+ DataTableReducerContext dataTableReducerContext =
+ new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
+ _groupByTrimThreshold);
+ StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext);
+
+ streamingReducer.init(dataTableReducerContext);
+
+ try {
+ processIterativeServerResponse(streamingReducer, _reduceExecutorService, serverResponseMap, reduceTimeOutMs,
+ aggregator);
+ } catch (Exception e) {
+ LOGGER.error("Unable to process streaming query response!", e);
+ throw new IOException("Unable to process streaming query response!", e);
+ }
+
+ // seal the streaming response.
+ BrokerResponseNative brokerResponseNative = streamingReducer.seal();
+
+ // Set execution statistics and Update broker metrics.
+ aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
+
+ updateAlias(queryContext, brokerResponseNative);
+ return brokerResponseNative;
+ }
+
+ private static void processIterativeServerResponse(StreamingReducer reducer, ExecutorService executorService,
+ Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs,
+ ExecutionStatsAggregator aggregator) throws Exception {
+ int cnt = 0;
+ Future[] futures = new Future[serverResponseMap.size()];
+ CountDownLatch countDownLatch = new CountDownLatch(serverResponseMap.size());
+
+ for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> entry: serverResponseMap.entrySet()) {
+ futures[cnt++] = executorService.submit(() -> {
+ Iterator<Server.ServerResponse> streamingResponses = entry.getValue();
+ try {
+ while (streamingResponses.hasNext()) {
+ Server.ServerResponse streamingResponse = streamingResponses.next();
+ DataTable dataTable = DataTableFactory.getDataTable(streamingResponse.getPayload().asReadOnlyByteBuffer());
+ // null dataSchema is a metadata-only block.
+ if (dataTable.getDataSchema() != null) {
+ reducer.reduce(entry.getKey(), dataTable);
+ } else {
+ aggregator.aggregate(entry.getKey(), dataTable);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to process streaming response. Failure occurred!", e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ }
+
+ try {
+ countDownLatch.await(reduceTimeOutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ for (Future future : futures) {
+ if (!future.isDone()) {
+ future.cancel(true);
+ }
+ }
+ throw new TimeoutException("Timed out in broker reduce phase.");
+ }
+ }
+
+ public void shutDown() {
+ _reduceExecutorService.shutdownNow();
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java
new file mode 100644
index 0000000..6d8a6e2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+
+
+public interface StreamingReducer {
+
+ void init(DataTableReducerContext dataTableReducerContext);
+
+ void reduce(ServerRoutingInstance key, DataTable dataTable);
+
+ BrokerResponseNative seal();
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
index a4cc1b9..09ffb2f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
@@ -30,6 +30,7 @@ public class ServerInstance {
private final String _hostname;
private final int _port;
+ private final int _grpcPort;
private final int _tlsPort;
/**
@@ -58,16 +59,20 @@ public class ServerInstance {
}
int tlsPort = -1;
+ int grpcPort = -1;
if (instanceConfig.getRecord() != null) {
tlsPort = instanceConfig.getRecord().getIntField(Helix.Instance.NETTYTLS_PORT_KEY, -1);
+ grpcPort = instanceConfig.getRecord().getIntField(Helix.Instance.GRPC_PORT_KEY, -1);
}
_tlsPort = tlsPort;
+ _grpcPort = grpcPort;
}
@VisibleForTesting
ServerInstance(String hostname, int port) {
_hostname = hostname;
_port = port;
+ _grpcPort = -1;
_tlsPort = -1;
}
@@ -79,10 +84,15 @@ public class ServerInstance {
return _port;
}
+ public int getGrpcPort() {
+ return _grpcPort;
+ }
+
public ServerRoutingInstance toServerRoutingInstance(TableType tableType) {
return new ServerRoutingInstance(_hostname, _port, tableType);
}
+ @Deprecated
public ServerRoutingInstance toServerRoutingInstance(TableType tableType, boolean preferTls) {
if (!preferTls) {
return toServerRoutingInstance(tableType);
@@ -95,6 +105,26 @@ public class ServerInstance {
return new ServerRoutingInstance(_hostname, _tlsPort, tableType, true);
}
+ public ServerRoutingInstance toServerRoutingInstance(TableType tableType, Type type) {
+ switch (type) {
+ case GRPC:
+ if (_grpcPort > 0) {
+ return new ServerRoutingInstance(_hostname, _grpcPort, tableType, false);
+ } else {
+ return new ServerRoutingInstance(_hostname, _port, tableType);
+ }
+ case TTS:
+ if (_tlsPort > 0) {
+ return new ServerRoutingInstance(_hostname, _tlsPort, tableType, true);
+ } else {
+ return new ServerRoutingInstance(_hostname, _port, tableType);
+ }
+ case DEFAULT:
+ default:
+ return new ServerRoutingInstance(_hostname, _port, tableType);
+ }
+ }
+
@Override
public int hashCode() {
return 31 * _hostname.hashCode() + _port;
@@ -119,4 +149,10 @@ public class ServerInstance {
public String toString() {
return Helix.PREFIX_OF_SERVER_INSTANCE + _hostname + HOSTNAME_PORT_DELIMITER + _port;
}
+
+ public enum Type {
+ DEFAULT,
+ GRPC,
+ TTS
+ }
}
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index e4abe2c..0ba4cc1 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -28,6 +28,7 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -100,16 +101,21 @@ public abstract class ClusterTest extends ControllerTest {
protected void startBroker(int port, String zkStr)
throws Exception {
- startBrokers(1, port, zkStr);
+ startBrokers(1, port, zkStr, Collections.emptyMap());
}
protected void startBrokers(int numBrokers)
throws Exception {
- startBrokers(numBrokers, DEFAULT_BROKER_PORT, getZkUrl());
+ startBrokers(numBrokers, DEFAULT_BROKER_PORT, getZkUrl(), Collections.emptyMap());
}
protected void startBrokers(int numBrokers, int basePort, String zkStr)
throws Exception {
+ startBrokers(numBrokers, basePort, zkStr, Collections.emptyMap());
+ }
+
+ protected void startBrokers(int numBrokers, int basePort, String zkStr, Map<String, Object> extraProperties)
+ throws Exception {
_brokerStarters = new ArrayList<>(numBrokers);
_brokerPorts = new ArrayList<>();
for (int i = 0; i < numBrokers; i++) {
@@ -121,6 +127,7 @@ public abstract class ClusterTest extends ControllerTest {
_brokerPorts.add(port);
properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, port);
properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
+ properties.putAll(extraProperties);
PinotConfiguration configuration = new PinotConfiguration(properties);
overrideBrokerConf(configuration);
@@ -185,12 +192,15 @@ public abstract class ClusterTest extends ControllerTest {
}
protected void startServer(PinotConfiguration configuration) {
- startServers(1, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
+ startServers(1, configuration);
}
protected void startServers(int numServers) {
- startServers(numServers, getDefaultServerConfiguration(), Server.DEFAULT_ADMIN_API_PORT,
- Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
+ startServers(numServers, getDefaultServerConfiguration());
+ }
+
+ protected void startServers(int numServers, PinotConfiguration configuration) {
+ startServers(numServers, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
}
protected void startServers(int numServers, int baseAdminApiPort, int baseNettyPort, String zkStr) {
@@ -199,6 +209,11 @@ public abstract class ClusterTest extends ControllerTest {
protected void startServers(int numServers, PinotConfiguration configuration, int baseAdminApiPort, int baseNettyPort,
String zkStr) {
+ startServers(numServers, configuration, baseAdminApiPort, baseNettyPort, Server.DEFAULT_GRPC_PORT, zkStr);
+ }
+
+ protected void startServers(int numServers, PinotConfiguration configuration, int baseAdminApiPort, int baseNettyPort,
+ int baseGrpcPort, String zkStr) {
FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
_serverStarters = new ArrayList<>(numServers);
overrideServerConf(configuration);
@@ -211,6 +226,10 @@ public abstract class ClusterTest extends ControllerTest {
.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
+ if (configuration.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, false)) {
+ configuration.setProperty(Server.CONFIG_OF_GRPC_PORT, baseGrpcPort + i);
+ }
+ configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
// Thread time measurement is disabled by default, enable it in integration tests.
// TODO: this can be removed when we eventually enable thread time measurement by default.
configuration.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
new file mode 100644
index 0000000..3a69e24
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final String TENANT_NAME = "TestTenant";
+ private static final int NUM_OFFLINE_SEGMENTS = 8;
+ private static final int NUM_REALTIME_SEGMENTS = 6;
+
+ @Override
+ protected String getBrokerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
+ protected String getServerTenant() {
+ return TENANT_NAME;
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration configuration) {
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start Zk, Kafka and Pinot
+ startHybridCluster();
+
+ List<File> avroFiles = getAllAvroFiles();
+ List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles, NUM_OFFLINE_SEGMENTS);
+ List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles, NUM_REALTIME_SEGMENTS);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ addTableConfig(offlineTableConfig);
+ addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils
+ .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Push data into Kafka
+ pushAvroIntoKafka(realtimeAvroFiles);
+
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
+
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
+
+ // TODO: this doesn't work so we simple wait for 5 second here. will be fixed after:
+ // https://github.com/apache/pinot/pull/7839
+ // waitForAllDocsLoaded(600_000L);
+ Thread.sleep(5000);
+ }
+
+ protected void startHybridCluster()
+ throws Exception {
+ // Start Zk and Kafka
+ startZk();
+ startKafka();
+
+ // Start the Pinot cluster
+ Map<String, Object> properties = getDefaultControllerConfiguration();
+ properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
+
+ startController(properties);
+
+ startBroker();
+
+ // Enable gRPC server
+ PinotConfiguration serverConfig = getDefaultServerConfiguration();
+ serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
+ startServers(2, serverConfig);
+
+ // Create tenants
+ createBrokerTenant(TENANT_NAME, 1);
+ createServerTenant(TENANT_NAME, 1, 1);
+ }
+
+ @Test
+ public void testGrpcBrokerRequestHandlerOnSelectionOnlyQuery()
+ throws Exception {
+ String sql;
+ sql = "SELECT * FROM mytable LIMIT 1000000";
+ testSqlQuery(sql, Collections.singletonList(sql));
+ sql = "SELECT * FROM mytable WHERE DaysSinceEpoch > 16312 LIMIT 10000000";
+ testSqlQuery(sql, Collections.singletonList(sql));
+ sql = "SELECT ArrTime, DaysSinceEpoch, Carrier FROM mytable LIMIT 10000000";
+ testSqlQuery(sql, Collections.singletonList(sql));
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 17ad6da..721e3d8 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -217,6 +217,11 @@ public class CommonConstants {
public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold";
public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
+ public static final String BROKER_REQUEST_HANDLER_TYPE = "pinot.broker.request.handler.type";
+ public static final String NETTY_BROKER_REQUEST_HANDLER_TYPE = "netty";
+ public static final String GRPC_BROKER_REQUEST_HANDLER_TYPE = "grpc";
+ public static final String DEFAULT_BROKER_REQUEST_HANDLER_TYPE = NETTY_BROKER_REQUEST_HANDLER_TYPE;
+
public static final String BROKER_TLS_PREFIX = "pinot.broker.tls";
public static final String BROKER_NETTYTLS_ENABLED = "pinot.broker.nettytls.enabled";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org