You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/11 02:36:31 UTC

[pinot] branch master updated: GRPC broker request handler (#7838)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 12edbdb  GRPC broker request handler  (#7838)
12edbdb is described below

commit 12edbdb1e521e0d9ee35a2d6ef2fed172e5beaa1
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Fri Dec 10 18:36:05 2021 -0800

    GRPC broker request handler  (#7838)
    
    Adding a framework of GRPCBrokerResponseHandler parallel to the SingleConnectionBrokerRequestHandler
    
    This handler handles data streaming back from server and process reduce in a streaming fashion.
---
 .../broker/broker/helix/BaseBrokerStarter.java     |  22 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |   4 -
 .../requesthandler/GrpcBrokerRequestHandler.java   | 156 +++++++++++
 .../SingleConnectionBrokerRequestHandler.java      |   4 +
 .../pinot/core/query/reduce/BaseReduceService.java | 305 +++++++++++++++++++++
 .../core/query/reduce/BrokerReduceService.java     | 244 +----------------
 .../core/query/reduce/ResultReducerFactory.java    |  10 +
 .../reduce/SelectionOnlyStreamingReducer.java      |  98 +++++++
 .../core/query/reduce/StreamingReduceService.java  | 153 +++++++++++
 .../pinot/core/query/reduce/StreamingReducer.java  |  33 +++
 .../pinot/core/transport/ServerInstance.java       |  36 +++
 .../pinot/integration/tests/ClusterTest.java       |  29 +-
 .../tests/GrpcBrokerClusterIntegrationTest.java    | 131 +++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   5 +
 14 files changed, 980 insertions(+), 250 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 527719a..b7e4c2d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -41,6 +41,7 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
 import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
 import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
 import org.apache.pinot.broker.routing.RoutingManager;
 import org.apache.pinot.common.Utils;
@@ -235,14 +236,23 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
     // Configure TLS for netty connection to server
     TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
 
-    if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
+    if (_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE)
+        .equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
+      LOGGER.info("Starting Grpc BrokerRequestHandler.");
       _brokerRequestHandler =
-          new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
-              queryQuotaManager, tableCache, _brokerMetrics, tlsDefaults);
-    } else {
-      _brokerRequestHandler =
-          new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+          new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
               queryQuotaManager, tableCache, _brokerMetrics, null);
+    } else { // default request handler type, e.g. netty
+      LOGGER.info("Starting Netty BrokerRequestHandler.");
+      if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
+        _brokerRequestHandler =
+            new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+                queryQuotaManager, tableCache, _brokerMetrics, tlsDefaults);
+      } else {
+        _brokerRequestHandler =
+            new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+                queryQuotaManager, tableCache, _brokerMetrics, null);
+      }
     }
 
     LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 8cbd52e..bb8f475 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -78,7 +78,6 @@ import org.apache.pinot.common.utils.helix.TableCache;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
-import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.requesthandler.PinotQueryParserFactory;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.util.QueryOptionsUtils;
@@ -117,7 +116,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
   protected final QueryOptimizer _queryOptimizer = new QueryOptimizer();
-  protected final BrokerReduceService _brokerReduceService;
 
   protected final String _brokerId;
   protected final long _brokerTimeoutMs;
@@ -159,8 +157,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
     _numDroppedLog = new AtomicInteger(0);
     _numDroppedLogRateLimiter = RateLimiter.create(1.0);
-
-    _brokerReduceService = new BrokerReduceService(_config);
     LOGGER.info(
         "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
         _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
new file mode 100644
index 0000000..7d0722e
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.broker.api.RequestStatistics;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.RoutingManager;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
+import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
+import org.apache.pinot.common.utils.helix.TableCache;
+import org.apache.pinot.core.query.reduce.StreamingReduceService;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.transport.TlsConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * The <code>GrpcBrokerRequestHandler</code> class communicates query request via GRPC.
+ */
+@ThreadSafe
+public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
+
+  private final GrpcQueryClient.Config _grpcConfig;
+  private final StreamingReduceService _streamingReduceService;
+  private final PinotStreamingQueryClient _streamingQueryClient;
+
+  public GrpcBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager,
+      AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
+      BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
+    super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+    _grpcConfig = buildGrpcQueryClientConfig(config);
+
+    // create streaming query client
+    _streamingQueryClient = new PinotStreamingQueryClient(_grpcConfig);
+
+    // create streaming reduce service
+    _streamingReduceService = new StreamingReduceService(config);
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public synchronized void shutDown() {
+    _streamingReduceService.shutDown();
+  }
+
+  @Override
+  protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
+      @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+      @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable,
+      long timeoutMs, ServerStats serverStats, RequestStatistics requestStatistics)
+      throws Exception {
+    assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
+
+    String rawTableName = TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName());
+    Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>();
+    // Making request to a streaming server response.
+
+    if (offlineBrokerRequest != null) {
+      // to offline servers.
+      assert offlineRoutingTable != null;
+      streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.OFFLINE, responseMap,
+          offlineBrokerRequest, offlineRoutingTable, timeoutMs, true, 1);
+    }
+    if (realtimeBrokerRequest != null) {
+      // to realtime servers.
+      assert realtimeRoutingTable != null;
+      streamingQueryToPinotServer(requestId, _brokerId, rawTableName, TableType.REALTIME, responseMap,
+          realtimeBrokerRequest, realtimeRoutingTable, timeoutMs, true, 1);
+    }
+    BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(
+        originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics);
+    return brokerResponse;
+  }
+
+  /**
+   * Query pinot server for data table.
+   */
+  public void streamingQueryToPinotServer(final long requestId, final String brokerHost, final String rawTableName,
+      final TableType tableType, Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap,
+      BrokerRequest brokerRequest, Map<ServerInstance, List<String>> routingTable, long connectionTimeoutInMillis,
+      boolean ignoreEmptyResponses, int pinotRetryCount) {
+    // Retries will all hit the same server because the routing decision has already been made by the pinot broker
+    Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap = new HashMap<>();
+    for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) {
+      ServerInstance serverInstance = routingEntry.getKey();
+      List<String> segments = routingEntry.getValue();
+      String serverHost = serverInstance.getHostname();
+      int port = serverInstance.getGrpcPort();
+      // TODO: enable throttling on per host bases.
+      Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port,
+          new GrpcRequestBuilder()
+              .setSegments(segments)
+              .setBrokerRequest(brokerRequest)
+              .setEnableStreaming(true));
+      responseMap.put(serverInstance.toServerRoutingInstance(tableType), streamingResponse);
+    }
+  }
+
+  // return empty config for now
+  private GrpcQueryClient.Config buildGrpcQueryClientConfig(PinotConfiguration config) {
+    return new GrpcQueryClient.Config();
+  }
+
+  public static class PinotStreamingQueryClient {
+    private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new ConcurrentHashMap<>();
+    private final GrpcQueryClient.Config _config;
+
+    public PinotStreamingQueryClient(GrpcQueryClient.Config config) {
+      _config = config;
+    }
+
+    public Iterator<Server.ServerResponse> submit(String host, int port, GrpcRequestBuilder requestBuilder) {
+      GrpcQueryClient client = getOrCreateGrpcQueryClient(host, port);
+      return client.submit(requestBuilder.build());
+    }
+
+    private GrpcQueryClient getOrCreateGrpcQueryClient(String host, int port) {
+      String key = String.format("%s_%d", host, port);
+      return _grpcQueryClientMap.computeIfAbsent(key, k -> new GrpcQueryClient(host, port, _config));
+    }
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 76deaa5..f585df72 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.helix.TableCache;
+import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.transport.AsyncQueryResponse;
 import org.apache.pinot.core.transport.QueryRouter;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -55,12 +56,15 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
  */
 @ThreadSafe
 public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandler {
+  private final BrokerReduceService _brokerReduceService;
   private final QueryRouter _queryRouter;
 
   public SingleConnectionBrokerRequestHandler(PinotConfiguration config, RoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
       BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
     super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+
+    _brokerReduceService = new BrokerReduceService(_config);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics, tlsConfig);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
new file mode 100644
index 0000000..6a6d1c6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.BrokerTimer;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is the base reduce service.
+ */
+@ThreadSafe
+public abstract class BaseReduceService {
+
+  // Set the reducer priority higher than NORM but lower than MAX, because if a query is complete
+  // we want to deserialize and return response as soon. This is the same as server side 'pqr' threads.
+  protected static final int QUERY_RUNNER_THREAD_PRIORITY = 7;
+  // brw -> Shorthand for broker reduce worker threads.
+  protected static final String REDUCE_THREAD_NAME_FORMAT = "brw-%d";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(BaseReduceService.class);
+
+  protected final ExecutorService _reduceExecutorService;
+  protected final int _maxReduceThreadsPerQuery;
+  protected final int _groupByTrimThreshold;
+
+  public BaseReduceService(PinotConfiguration config) {
+    _maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
+        CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);
+    _groupByTrimThreshold = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD,
+        CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
+
+    int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors();
+    LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.",
+        numThreadsInExecutorService, _maxReduceThreadsPerQuery);
+
+    ThreadFactory reduceThreadFactory =
+        new ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY)
+            .setNameFormat(REDUCE_THREAD_NAME_FORMAT).build();
+
+    // ExecutorService is initialized with numThreads same as availableProcessors.
+    _reduceExecutorService = Executors.newFixedThreadPool(numThreadsInExecutorService, reduceThreadFactory);
+  }
+
+  protected static void updateAlias(QueryContext queryContext, BrokerResponseNative brokerResponseNative) {
+    ResultTable resultTable = brokerResponseNative.getResultTable();
+    if (resultTable == null) {
+      return;
+    }
+    List<String> aliasList = queryContext.getAliasList();
+    if (aliasList.isEmpty()) {
+      return;
+    }
+
+    String[] columnNames = resultTable.getDataSchema().getColumnNames();
+    List<ExpressionContext> selectExpressions = getSelectExpressions(queryContext.getSelectExpressions());
+    int numSelectExpressions = selectExpressions.size();
+    // For query like `SELECT *`, we skip alias update.
+    if (columnNames.length != numSelectExpressions) {
+      return;
+    }
+    for (int i = 0; i < numSelectExpressions; i++) {
+      String alias = aliasList.get(i);
+      if (alias != null) {
+        columnNames[i] = alias;
+      }
+    }
+  }
+
+  protected static List<ExpressionContext> getSelectExpressions(List<ExpressionContext> selectExpressions) {
+    // NOTE: For DISTINCT queries, need to extract the arguments as the SELECT expressions
+    if (selectExpressions.size() == 1 && selectExpressions.get(0).getType() == ExpressionContext.Type.FUNCTION
+        && selectExpressions.get(0).getFunction().getFunctionName().equals("distinct")) {
+      return selectExpressions.get(0).getFunction().getArguments();
+    }
+    return selectExpressions;
+  }
+
+  protected void shutDown() {
+    _reduceExecutorService.shutdownNow();
+  }
+
+  protected static class ExecutionStatsAggregator {
+    private final List<QueryProcessingException> _processingExceptions = new ArrayList<>();
+    private final Map<String, String> _traceInfo = new HashMap<>();
+    private final boolean _enableTrace;
+
+    private long _numDocsScanned = 0L;
+    private long _numEntriesScannedInFilter = 0L;
+    private long _numEntriesScannedPostFilter = 0L;
+    private long _numSegmentsQueried = 0L;
+    private long _numSegmentsProcessed = 0L;
+    private long _numSegmentsMatched = 0L;
+    private long _numConsumingSegmentsProcessed = 0L;
+    private long _minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+    private long _numTotalDocs = 0L;
+    private long _offlineThreadCpuTimeNs = 0L;
+    private long _realtimeThreadCpuTimeNs = 0L;
+    private long _offlineSystemActivitiesCpuTimeNs = 0L;
+    private long _realtimeSystemActivitiesCpuTimeNs = 0L;
+    private long _offlineResponseSerializationCpuTimeNs = 0L;
+    private long _realtimeResponseSerializationCpuTimeNs = 0L;
+    private long _offlineTotalCpuTimeNs = 0L;
+    private long _realtimeTotalCpuTimeNs = 0L;
+    private boolean _numGroupsLimitReached = false;
+
+    protected ExecutionStatsAggregator(boolean enableTrace) {
+      _enableTrace = enableTrace;
+    }
+
+    protected synchronized void aggregate(ServerRoutingInstance routingInstance, DataTable dataTable) {
+      Map<String, String> metadata = dataTable.getMetadata();
+      // Reduce on trace info.
+      if (_enableTrace) {
+        _traceInfo.put(routingInstance.getHostname(), metadata.get(MetadataKey.TRACE_INFO.getName()));
+      }
+
+      // Reduce on exceptions.
+      Map<Integer, String> exceptions = dataTable.getExceptions();
+      for (int key : exceptions.keySet()) {
+        _processingExceptions.add(new QueryProcessingException(key, exceptions.get(key)));
+      }
+
+      // Reduce on execution statistics.
+      String numDocsScannedString = metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName());
+      if (numDocsScannedString != null) {
+        _numDocsScanned += Long.parseLong(numDocsScannedString);
+      }
+      String numEntriesScannedInFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
+      if (numEntriesScannedInFilterString != null) {
+        _numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
+      }
+      String numEntriesScannedPostFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
+      if (numEntriesScannedPostFilterString != null) {
+        _numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
+      }
+      String numSegmentsQueriedString = metadata.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName());
+      if (numSegmentsQueriedString != null) {
+        _numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
+      }
+
+      String numSegmentsProcessedString = metadata.get(MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
+      if (numSegmentsProcessedString != null) {
+        _numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
+      }
+      String numSegmentsMatchedString = metadata.get(MetadataKey.NUM_SEGMENTS_MATCHED.getName());
+      if (numSegmentsMatchedString != null) {
+        _numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
+      }
+
+      String numConsumingString = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
+      if (numConsumingString != null) {
+        _numConsumingSegmentsProcessed += Long.parseLong(numConsumingString);
+      }
+
+      String minConsumingFreshnessTimeMsString = metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
+      if (minConsumingFreshnessTimeMsString != null) {
+        _minConsumingFreshnessTimeMs =
+            Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), _minConsumingFreshnessTimeMs);
+      }
+
+      String threadCpuTimeNsString = metadata.get(MetadataKey.THREAD_CPU_TIME_NS.getName());
+      if (threadCpuTimeNsString != null) {
+        if (routingInstance.getTableType() == TableType.OFFLINE) {
+          _offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
+        } else {
+          _realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
+        }
+      }
+
+      String systemActivitiesCpuTimeNsString = metadata.get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
+      if (systemActivitiesCpuTimeNsString != null) {
+        if (routingInstance.getTableType() == TableType.OFFLINE) {
+          _offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+        } else {
+          _realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+        }
+      }
+
+      String responseSerializationCpuTimeNsString = metadata.get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+      if (responseSerializationCpuTimeNsString != null) {
+        if (routingInstance.getTableType() == TableType.OFFLINE) {
+          _offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+        } else {
+          _realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+        }
+      }
+      _offlineTotalCpuTimeNs =
+          _offlineThreadCpuTimeNs + _offlineSystemActivitiesCpuTimeNs + _offlineResponseSerializationCpuTimeNs;
+      _realtimeTotalCpuTimeNs =
+          _realtimeThreadCpuTimeNs + _realtimeSystemActivitiesCpuTimeNs + _realtimeResponseSerializationCpuTimeNs;
+
+      String numTotalDocsString = metadata.get(MetadataKey.TOTAL_DOCS.getName());
+      if (numTotalDocsString != null) {
+        _numTotalDocs += Long.parseLong(numTotalDocsString);
+      }
+      _numGroupsLimitReached |= Boolean.parseBoolean(metadata.get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+    }
+
+    protected void setStats(String rawTableName, BrokerResponseNative brokerResponseNative,
+        BrokerMetrics brokerMetrics) {
+      // set exception
+      List<QueryProcessingException> processingExceptions = brokerResponseNative.getProcessingExceptions();
+      processingExceptions.addAll(_processingExceptions);
+
+      // add all trace.
+      if (_enableTrace) {
+        brokerResponseNative.getTraceInfo().putAll(_traceInfo);
+      }
+
+      // Set execution statistics.
+      brokerResponseNative.setNumDocsScanned(_numDocsScanned);
+      brokerResponseNative.setNumEntriesScannedInFilter(_numEntriesScannedInFilter);
+      brokerResponseNative.setNumEntriesScannedPostFilter(_numEntriesScannedPostFilter);
+      brokerResponseNative.setNumSegmentsQueried(_numSegmentsQueried);
+      brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
+      brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
+      brokerResponseNative.setTotalDocs(_numTotalDocs);
+      brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
+      brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
+      brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
+      brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
+      brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(_realtimeSystemActivitiesCpuTimeNs);
+      brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(_offlineResponseSerializationCpuTimeNs);
+      brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(_realtimeResponseSerializationCpuTimeNs);
+      brokerResponseNative.setOfflineTotalCpuTimeNs(_offlineTotalCpuTimeNs);
+      brokerResponseNative.setRealtimeTotalCpuTimeNs(_realtimeTotalCpuTimeNs);
+      if (_numConsumingSegmentsProcessed > 0) {
+        brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsProcessed);
+        brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
+      }
+
+      // Update broker metrics.
+      if (brokerMetrics != null) {
+        brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, _numDocsScanned);
+        brokerMetrics
+            .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, _numEntriesScannedInFilter);
+        brokerMetrics
+            .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, _numEntriesScannedPostFilter);
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, _offlineThreadCpuTimeNs,
+            TimeUnit.NANOSECONDS);
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS,
+            _realtimeThreadCpuTimeNs,
+            TimeUnit.NANOSECONDS);
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+            _offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+            _realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
+            _offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
+            _realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, _offlineTotalCpuTimeNs,
+            TimeUnit.NANOSECONDS);
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
+            TimeUnit.NANOSECONDS);
+
+        if (_numConsumingSegmentsProcessed > 0 && _minConsumingFreshnessTimeMs > 0) {
+          brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
+              System.currentTimeMillis() - _minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index fa22e54..f2796fc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -18,37 +18,22 @@
  */
 package org.apache.pinot.core.query.reduce;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -56,68 +41,9 @@ import org.slf4j.LoggerFactory;
  * to {@link BrokerResponseNative}.
  */
 @ThreadSafe
-public class BrokerReduceService {
-
-  // Set the reducer priority higher than NORM but lower than MAX, because if a query is complete
-  // we want to deserialize and return response as soon. This is the same as server side 'pqr' threads.
-  protected static final int QUERY_RUNNER_THREAD_PRIORITY = 7;
-  private static final Logger LOGGER = LoggerFactory.getLogger(BrokerReduceService.class);
-  // brw -> Shorthand for broker reduce worker threads.
-  private static final String REDUCE_THREAD_NAME_FORMAT = "brw-%d";
-  private final ExecutorService _reduceExecutorService;
-  private final int _maxReduceThreadsPerQuery;
-  private final int _groupByTrimThreshold;
-
+public class BrokerReduceService extends BaseReduceService {
   public BrokerReduceService(PinotConfiguration config) {
-    _maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
-        CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);
-    _groupByTrimThreshold = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD,
-        CommonConstants.Broker.DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD);
-
-    int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors();
-    LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.",
-        numThreadsInExecutorService, _maxReduceThreadsPerQuery);
-
-    ThreadFactory reduceThreadFactory =
-        new ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY)
-            .setNameFormat(REDUCE_THREAD_NAME_FORMAT).build();
-
-    // ExecutorService is initialized with numThreads same as availableProcessors.
-    _reduceExecutorService = Executors.newFixedThreadPool(numThreadsInExecutorService, reduceThreadFactory);
-  }
-
-  private static void updateAlias(QueryContext queryContext, BrokerResponseNative brokerResponseNative) {
-    ResultTable resultTable = brokerResponseNative.getResultTable();
-    if (resultTable == null) {
-      return;
-    }
-    List<String> aliasList = queryContext.getAliasList();
-    if (aliasList.isEmpty()) {
-      return;
-    }
-
-    String[] columnNames = resultTable.getDataSchema().getColumnNames();
-    List<ExpressionContext> selectExpressions = getSelectExpressions(queryContext.getSelectExpressions());
-    int numSelectExpressions = selectExpressions.size();
-    // For query like `SELECT *`, we skip alias update.
-    if (columnNames.length != numSelectExpressions) {
-      return;
-    }
-    for (int i = 0; i < numSelectExpressions; i++) {
-      String alias = aliasList.get(i);
-      if (alias != null) {
-        columnNames[i] = alias;
-      }
-    }
-  }
-
-  private static List<ExpressionContext> getSelectExpressions(List<ExpressionContext> selectExpressions) {
-    // NOTE: For DISTINCT queries, need to extract the arguments as the SELECT expressions
-    if (selectExpressions.size() == 1 && selectExpressions.get(0).getType() == ExpressionContext.Type.FUNCTION
-        && selectExpressions.get(0).getFunction().getFunctionName().equals("distinct")) {
-      return selectExpressions.get(0).getFunction().getArguments();
-    }
-    return selectExpressions;
+    super(config);
   }
 
   public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest,
@@ -127,34 +53,15 @@ public class BrokerReduceService {
       return BrokerResponseNative.empty();
     }
 
-    BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
-    List<QueryProcessingException> processingExceptions = brokerResponseNative.getProcessingExceptions();
-    long numDocsScanned = 0L;
-    long numEntriesScannedInFilter = 0L;
-    long numEntriesScannedPostFilter = 0L;
-    long numSegmentsQueried = 0L;
-    long numSegmentsProcessed = 0L;
-    long numSegmentsMatched = 0L;
-    long numConsumingSegmentsProcessed = 0L;
-    long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
-    long numTotalDocs = 0L;
-    long offlineThreadCpuTimeNs = 0L;
-    long realtimeThreadCpuTimeNs = 0L;
-    long offlineSystemActivitiesCpuTimeNs = 0L;
-    long realtimeSystemActivitiesCpuTimeNs = 0L;
-    long offlineResponseSerializationCpuTimeNs = 0L;
-    long realtimeResponseSerializationCpuTimeNs = 0L;
-    long offlineTotalCpuTimeNs = 0L;
-    long realtimeTotalCpuTimeNs = 0L;
-
-    boolean numGroupsLimitReached = false;
-
     PinotQuery pinotQuery = brokerRequest.getPinotQuery();
     Map<String, String> queryOptions =
         pinotQuery != null ? pinotQuery.getQueryOptions() : brokerRequest.getQueryOptions();
     boolean enableTrace =
         queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
 
+    ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
+    BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
+
     // Cache a data schema from data tables (try to cache one with data rows associated with it).
     DataSchema cachedDataSchema = null;
 
@@ -163,94 +70,9 @@ public class BrokerReduceService {
     while (iterator.hasNext()) {
       Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
       DataTable dataTable = entry.getValue();
-      Map<String, String> metadata = dataTable.getMetadata();
-
-      // Reduce on trace info.
-      if (enableTrace) {
-        brokerResponseNative.getTraceInfo()
-            .put(entry.getKey().getHostname(), metadata.get(MetadataKey.TRACE_INFO.getName()));
-      }
-
-      // Reduce on exceptions.
-      Map<Integer, String> exceptions = dataTable.getExceptions();
-      for (int key : exceptions.keySet()) {
-        processingExceptions.add(new QueryProcessingException(key, exceptions.get(key)));
-      }
-
-      // Reduce on execution statistics.
-      String numDocsScannedString = metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName());
-      if (numDocsScannedString != null) {
-        numDocsScanned += Long.parseLong(numDocsScannedString);
-      }
-      String numEntriesScannedInFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
-      if (numEntriesScannedInFilterString != null) {
-        numEntriesScannedInFilter += Long.parseLong(numEntriesScannedInFilterString);
-      }
-      String numEntriesScannedPostFilterString = metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
-      if (numEntriesScannedPostFilterString != null) {
-        numEntriesScannedPostFilter += Long.parseLong(numEntriesScannedPostFilterString);
-      }
-      String numSegmentsQueriedString = metadata.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName());
-      if (numSegmentsQueriedString != null) {
-        numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
-      }
-
-      String numSegmentsProcessedString = metadata.get(MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
-      if (numSegmentsProcessedString != null) {
-        numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
-      }
-      String numSegmentsMatchedString = metadata.get(MetadataKey.NUM_SEGMENTS_MATCHED.getName());
-      if (numSegmentsMatchedString != null) {
-        numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
-      }
-
-      String numConsumingString = metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
-      if (numConsumingString != null) {
-        numConsumingSegmentsProcessed += Long.parseLong(numConsumingString);
-      }
 
-      String minConsumingFreshnessTimeMsString = metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
-      if (minConsumingFreshnessTimeMsString != null) {
-        minConsumingFreshnessTimeMs =
-            Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), minConsumingFreshnessTimeMs);
-      }
-
-      String threadCpuTimeNsString = metadata.get(MetadataKey.THREAD_CPU_TIME_NS.getName());
-      if (threadCpuTimeNsString != null) {
-        if (entry.getKey().getTableType() == TableType.OFFLINE) {
-          offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
-        } else {
-          realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
-        }
-      }
-
-      String systemActivitiesCpuTimeNsString = metadata.get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
-      if (systemActivitiesCpuTimeNsString != null) {
-        if (entry.getKey().getTableType() == TableType.OFFLINE) {
-          offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
-        } else {
-          realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
-        }
-      }
-
-      String responseSerializationCpuTimeNsString = metadata.get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
-      if (responseSerializationCpuTimeNsString != null) {
-        if (entry.getKey().getTableType() == TableType.OFFLINE) {
-          offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
-        } else {
-          realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
-        }
-      }
-      offlineTotalCpuTimeNs =
-          offlineThreadCpuTimeNs + offlineSystemActivitiesCpuTimeNs + offlineResponseSerializationCpuTimeNs;
-      realtimeTotalCpuTimeNs =
-          realtimeThreadCpuTimeNs + realtimeSystemActivitiesCpuTimeNs + realtimeResponseSerializationCpuTimeNs;
-
-      String numTotalDocsString = metadata.get(MetadataKey.TOTAL_DOCS.getName());
-      if (numTotalDocsString != null) {
-        numTotalDocs += Long.parseLong(numTotalDocsString);
-      }
-      numGroupsLimitReached |= Boolean.parseBoolean(metadata.get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+      // aggregate metrics
+      aggregator.aggregate(entry.getKey(), dataTable);
 
       // After processing the metadata, remove data tables without data rows inside.
       DataSchema dataSchema = dataTable.getDataSchema();
@@ -269,59 +91,11 @@ public class BrokerReduceService {
       }
     }
 
-    // Set execution statistics.
-    brokerResponseNative.setNumDocsScanned(numDocsScanned);
-    brokerResponseNative.setNumEntriesScannedInFilter(numEntriesScannedInFilter);
-    brokerResponseNative.setNumEntriesScannedPostFilter(numEntriesScannedPostFilter);
-    brokerResponseNative.setNumSegmentsQueried(numSegmentsQueried);
-    brokerResponseNative.setNumSegmentsProcessed(numSegmentsProcessed);
-    brokerResponseNative.setNumSegmentsMatched(numSegmentsMatched);
-    brokerResponseNative.setTotalDocs(numTotalDocs);
-    brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
-    brokerResponseNative.setOfflineThreadCpuTimeNs(offlineThreadCpuTimeNs);
-    brokerResponseNative.setRealtimeThreadCpuTimeNs(realtimeThreadCpuTimeNs);
-    brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(offlineSystemActivitiesCpuTimeNs);
-    brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(realtimeSystemActivitiesCpuTimeNs);
-    brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(offlineResponseSerializationCpuTimeNs);
-    brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(realtimeResponseSerializationCpuTimeNs);
-    brokerResponseNative.setOfflineTotalCpuTimeNs(offlineTotalCpuTimeNs);
-    brokerResponseNative.setRealtimeTotalCpuTimeNs(realtimeTotalCpuTimeNs);
-    if (numConsumingSegmentsProcessed > 0) {
-      brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsProcessed);
-      brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
-    }
-
-    // Update broker metrics.
     String tableName = brokerRequest.getQuerySource().getTableName();
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-    if (brokerMetrics != null) {
-      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, numDocsScanned);
-      brokerMetrics
-          .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
-      brokerMetrics
-          .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
-      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, offlineThreadCpuTimeNs,
-          TimeUnit.NANOSECONDS);
-      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS, realtimeThreadCpuTimeNs,
-          TimeUnit.NANOSECONDS);
-      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
-          offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
-      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
-          realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
-      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
-          offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
-      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
-          realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
-      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, offlineTotalCpuTimeNs,
-          TimeUnit.NANOSECONDS);
-      brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, realtimeTotalCpuTimeNs,
-          TimeUnit.NANOSECONDS);
 
-      if (numConsumingSegmentsProcessed > 0 && minConsumingFreshnessTimeMs > 0) {
-        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
-            System.currentTimeMillis() - minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
-      }
-    }
+    // Set execution statistics and Update broker metrics.
+    aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
 
     // NOTE: When there is no cached data schema, that means all servers encountered exception. In such case, return the
     //       response with metadata only.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
index e5e9bf8..a48a007 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -22,6 +22,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
@@ -65,4 +66,13 @@ public final class ResultReducerFactory {
       }
     }
   }
+
+  public static StreamingReducer getStreamingReducer(QueryContext queryContext) {
+    if (!QueryContextUtils.isSelectionQuery(queryContext) || queryContext.getOrderByExpressions() != null) {
+      throw new UnsupportedOperationException("Only selection queries are supported");
+    } else {
+      // Selection query
+      return new SelectionOnlyStreamingReducer(queryContext);
+    }
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
new file mode 100644
index 0000000..7fa7f84
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionOnlyStreamingReducer.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.QueryOptionsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SelectionOnlyStreamingReducer implements StreamingReducer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOnlyStreamingReducer.class);
+
+  private final QueryContext _queryContext;
+  private final boolean _preserveType;
+  private final int _limit;
+
+  private DataSchema _dataSchema;
+  private DataTableReducerContext _dataTableReducerContext;
+  private List<Object[]> _rows;
+
+  public SelectionOnlyStreamingReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _limit = _queryContext.getLimit();
+    Map<String, String> queryOptions = queryContext.getQueryOptions();
+    Preconditions.checkState(QueryOptionsUtils.isResponseFormatSQL(queryOptions), "only SQL response is supported");
+
+    _preserveType = QueryOptionsUtils.isPreserveType(queryOptions);
+    _dataSchema = null;
+  }
+
+  @Override
+  public void init(DataTableReducerContext dataTableReducerContext) {
+    _dataTableReducerContext = dataTableReducerContext;
+    _rows = new ArrayList<>(Math.min(_limit, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public synchronized void reduce(ServerRoutingInstance key, DataTable dataTable) {
+    // get dataSchema
+    _dataSchema = _dataSchema == null ? dataTable.getDataSchema() : _dataSchema;
+    // TODO: For data table map with more than one data tables, remove conflicting data tables
+    reduceWithoutOrdering(dataTable, _limit);
+  }
+
+  private void reduceWithoutOrdering(DataTable dataTable, int limit) {
+    int numRows = dataTable.getNumberOfRows();
+    for (int rowId = 0; rowId < numRows; rowId++) {
+      if (_rows.size() < limit) {
+        _rows.add(SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId));
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Override
+  public BrokerResponseNative seal() {
+    BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
+    List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, _dataSchema);
+    if (_dataSchema != null && _rows.size() > 0) {
+      brokerResponseNative.setResultTable(
+          SelectionOperatorUtils.renderResultTableWithoutOrdering(_rows, _dataSchema, selectionColumns));
+    } else {
+      // For empty data table map, construct empty result using the cached data schema for selection query
+      DataSchema selectionDataSchema = SelectionOperatorUtils.getResultTableDataSchema(_dataSchema, selectionColumns);
+      brokerResponseNative.setResultTable(new ResultTable(selectionDataSchema, Collections.emptyList()));
+    }
+    return brokerResponseNative;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
new file mode 100644
index 0000000..3a80494
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The <code>StreamingReduceService</code> class provides service to reduce grpc response gathered from multiple servers
+ * to {@link BrokerResponseNative}.
+ */
+@ThreadSafe
+public class StreamingReduceService extends BaseReduceService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(StreamingReduceService.class);
+
+  public StreamingReduceService(PinotConfiguration config) {
+    super(config);
+  }
+
+  public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest,
+      Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs,
+      @Nullable BrokerMetrics brokerMetrics) throws IOException {
+    if (serverResponseMap.isEmpty()) {
+      // Empty response.
+      return BrokerResponseNative.empty();
+    }
+
+    // prepare contextual info for reduce.
+    PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+    Map<String, String> queryOptions =
+        pinotQuery != null ? pinotQuery.getQueryOptions() : brokerRequest.getQueryOptions();
+    boolean enableTrace =
+        queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
+
+    QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest);
+
+    String tableName = brokerRequest.getQuerySource().getTableName();
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+
+    // initialize empty response.
+    ExecutionStatsAggregator aggregator = new ExecutionStatsAggregator(enableTrace);
+
+    // Process server response.
+    DataTableReducerContext dataTableReducerContext =
+        new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs,
+            _groupByTrimThreshold);
+    StreamingReducer streamingReducer = ResultReducerFactory.getStreamingReducer(queryContext);
+
+    streamingReducer.init(dataTableReducerContext);
+
+    try {
+      processIterativeServerResponse(streamingReducer, _reduceExecutorService, serverResponseMap, reduceTimeOutMs,
+          aggregator);
+    } catch (Exception e) {
+      LOGGER.error("Unable to process streaming query response!", e);
+      throw new IOException("Unable to process streaming query response!", e);
+    }
+
+    // seal the streaming response.
+    BrokerResponseNative brokerResponseNative = streamingReducer.seal();
+
+    // Set execution statistics and Update broker metrics.
+    aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
+
+    updateAlias(queryContext, brokerResponseNative);
+    return brokerResponseNative;
+  }
+
+  private static void processIterativeServerResponse(StreamingReducer reducer, ExecutorService executorService,
+      Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs,
+      ExecutionStatsAggregator aggregator) throws Exception {
+    int cnt = 0;
+    Future[] futures = new Future[serverResponseMap.size()];
+    CountDownLatch countDownLatch = new CountDownLatch(serverResponseMap.size());
+
+    for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> entry: serverResponseMap.entrySet()) {
+      futures[cnt++] = executorService.submit(() -> {
+        Iterator<Server.ServerResponse> streamingResponses = entry.getValue();
+        try {
+          while (streamingResponses.hasNext()) {
+            Server.ServerResponse streamingResponse = streamingResponses.next();
+            DataTable dataTable = DataTableFactory.getDataTable(streamingResponse.getPayload().asReadOnlyByteBuffer());
+            // null dataSchema is a metadata-only block.
+            if (dataTable.getDataSchema() != null) {
+              reducer.reduce(entry.getKey(), dataTable);
+            } else {
+              aggregator.aggregate(entry.getKey(), dataTable);
+            }
+          }
+        } catch (Exception e) {
+          throw new RuntimeException("Unable to process streaming response. Failure occurred!", e);
+        } finally {
+          countDownLatch.countDown();
+        }
+      });
+    }
+
+    try {
+      countDownLatch.await(reduceTimeOutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      throw new TimeoutException("Timed out in broker reduce phase.");
+    }
+  }
+
+  public void shutDown() {
+    _reduceExecutorService.shutdownNow();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java
new file mode 100644
index 0000000..6d8a6e2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReducer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+
+
+public interface StreamingReducer {
+
+  void init(DataTableReducerContext dataTableReducerContext);
+
+  void reduce(ServerRoutingInstance key, DataTable dataTable);
+
+  BrokerResponseNative seal();
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
index a4cc1b9..09ffb2f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
@@ -30,6 +30,7 @@ public class ServerInstance {
 
   private final String _hostname;
   private final int _port;
+  private final int _grpcPort;
   private final int _tlsPort;
 
   /**
@@ -58,16 +59,20 @@ public class ServerInstance {
     }
 
     int tlsPort = -1;
+    int grpcPort = -1;
     if (instanceConfig.getRecord() != null) {
       tlsPort = instanceConfig.getRecord().getIntField(Helix.Instance.NETTYTLS_PORT_KEY, -1);
+      grpcPort = instanceConfig.getRecord().getIntField(Helix.Instance.GRPC_PORT_KEY, -1);
     }
     _tlsPort = tlsPort;
+    _grpcPort = grpcPort;
   }
 
   @VisibleForTesting
   ServerInstance(String hostname, int port) {
     _hostname = hostname;
     _port = port;
+    _grpcPort = -1;
     _tlsPort = -1;
   }
 
@@ -79,10 +84,15 @@ public class ServerInstance {
     return _port;
   }
 
+  public int getGrpcPort() {
+    return _grpcPort;
+  }
+
   public ServerRoutingInstance toServerRoutingInstance(TableType tableType) {
     return new ServerRoutingInstance(_hostname, _port, tableType);
   }
 
+  @Deprecated
   public ServerRoutingInstance toServerRoutingInstance(TableType tableType, boolean preferTls) {
     if (!preferTls) {
       return toServerRoutingInstance(tableType);
@@ -95,6 +105,26 @@ public class ServerInstance {
     return new ServerRoutingInstance(_hostname, _tlsPort, tableType, true);
   }
 
+  public ServerRoutingInstance toServerRoutingInstance(TableType tableType, Type type) {
+    switch (type) {
+      case GRPC:
+        if (_grpcPort > 0) {
+          return new ServerRoutingInstance(_hostname, _grpcPort, tableType, false);
+        } else {
+          return new ServerRoutingInstance(_hostname, _port, tableType);
+        }
+      case TTS:
+        if (_tlsPort > 0) {
+          return new ServerRoutingInstance(_hostname, _tlsPort, tableType, true);
+        } else {
+          return new ServerRoutingInstance(_hostname, _port, tableType);
+        }
+      case DEFAULT:
+      default:
+        return new ServerRoutingInstance(_hostname, _port, tableType);
+    }
+  }
+
   @Override
   public int hashCode() {
     return 31 * _hostname.hashCode() + _port;
@@ -119,4 +149,10 @@ public class ServerInstance {
   public String toString() {
     return Helix.PREFIX_OF_SERVER_INSTANCE + _hostname + HOSTNAME_PORT_DELIMITER + _port;
   }
+
+  public enum Type {
+    DEFAULT,
+    GRPC,
+    TTS
+  }
 }
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index e4abe2c..0ba4cc1 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -28,6 +28,7 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -100,16 +101,21 @@ public abstract class ClusterTest extends ControllerTest {
 
   protected void startBroker(int port, String zkStr)
       throws Exception {
-    startBrokers(1, port, zkStr);
+    startBrokers(1, port, zkStr, Collections.emptyMap());
   }
 
   protected void startBrokers(int numBrokers)
       throws Exception {
-    startBrokers(numBrokers, DEFAULT_BROKER_PORT, getZkUrl());
+    startBrokers(numBrokers, DEFAULT_BROKER_PORT, getZkUrl(), Collections.emptyMap());
   }
 
   protected void startBrokers(int numBrokers, int basePort, String zkStr)
       throws Exception {
+    startBrokers(numBrokers, basePort, zkStr, Collections.emptyMap());
+  }
+
+  protected void startBrokers(int numBrokers, int basePort, String zkStr, Map<String, Object> extraProperties)
+      throws Exception {
     _brokerStarters = new ArrayList<>(numBrokers);
     _brokerPorts = new ArrayList<>();
     for (int i = 0; i < numBrokers; i++) {
@@ -121,6 +127,7 @@ public abstract class ClusterTest extends ControllerTest {
       _brokerPorts.add(port);
       properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, port);
       properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
+      properties.putAll(extraProperties);
       PinotConfiguration configuration = new PinotConfiguration(properties);
       overrideBrokerConf(configuration);
 
@@ -185,12 +192,15 @@ public abstract class ClusterTest extends ControllerTest {
   }
 
   protected void startServer(PinotConfiguration configuration) {
-    startServers(1, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
+    startServers(1, configuration);
   }
 
   protected void startServers(int numServers) {
-    startServers(numServers, getDefaultServerConfiguration(), Server.DEFAULT_ADMIN_API_PORT,
-        Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
+    startServers(numServers, getDefaultServerConfiguration());
+  }
+
+  protected void startServers(int numServers, PinotConfiguration configuration) {
+    startServers(numServers, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl());
   }
 
   protected void startServers(int numServers, int baseAdminApiPort, int baseNettyPort, String zkStr) {
@@ -199,6 +209,11 @@ public abstract class ClusterTest extends ControllerTest {
 
   protected void startServers(int numServers, PinotConfiguration configuration, int baseAdminApiPort, int baseNettyPort,
       String zkStr) {
+    startServers(numServers, configuration, baseAdminApiPort, baseNettyPort, Server.DEFAULT_GRPC_PORT, zkStr);
+  }
+
+  protected void startServers(int numServers, PinotConfiguration configuration, int baseAdminApiPort, int baseNettyPort,
+      int baseGrpcPort, String zkStr) {
     FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
     _serverStarters = new ArrayList<>(numServers);
     overrideServerConf(configuration);
@@ -211,6 +226,10 @@ public abstract class ClusterTest extends ControllerTest {
             .setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
         configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, baseAdminApiPort - i);
         configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
+        if (configuration.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, false)) {
+          configuration.setProperty(Server.CONFIG_OF_GRPC_PORT, baseGrpcPort + i);
+        }
+        configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + i);
         // Thread time measurement is disabled by default, enable it in integration tests.
         // TODO: this can be removed when we eventually enable thread time measurement by default.
         configuration.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
new file mode 100644
index 0000000..3a69e24
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GrpcBrokerClusterIntegrationTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+  private static final String TENANT_NAME = "TestTenant";
+  private static final int NUM_OFFLINE_SEGMENTS = 8;
+  private static final int NUM_REALTIME_SEGMENTS = 6;
+
+  @Override
+  protected String getBrokerTenant() {
+    return TENANT_NAME;
+  }
+
+  @Override
+  protected String getServerTenant() {
+    return TENANT_NAME;
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration configuration) {
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start Zk, Kafka and Pinot
+    startHybridCluster();
+
+    List<File> avroFiles = getAllAvroFiles();
+    List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles, NUM_OFFLINE_SEGMENTS);
+    List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles, NUM_REALTIME_SEGMENTS);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig offlineTableConfig = createOfflineTableConfig();
+    addTableConfig(offlineTableConfig);
+    addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils
+        .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Push data into Kafka
+    pushAvroIntoKafka(realtimeAvroFiles);
+
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+
+    // TODO: this doesn't work so we simple wait for 5 second here. will be fixed after:
+    // https://github.com/apache/pinot/pull/7839
+    // waitForAllDocsLoaded(600_000L);
+    Thread.sleep(5000);
+  }
+
+  protected void startHybridCluster()
+      throws Exception {
+    // Start Zk and Kafka
+    startZk();
+    startKafka();
+
+    // Start the Pinot cluster
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+    properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
+
+    startController(properties);
+
+    startBroker();
+
+    // Enable gRPC server
+    PinotConfiguration serverConfig = getDefaultServerConfiguration();
+    serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
+    startServers(2, serverConfig);
+
+    // Create tenants
+    createBrokerTenant(TENANT_NAME, 1);
+    createServerTenant(TENANT_NAME, 1, 1);
+  }
+
+  @Test
+  public void testGrpcBrokerRequestHandlerOnSelectionOnlyQuery()
+      throws Exception {
+    String sql;
+    sql = "SELECT * FROM mytable LIMIT 1000000";
+    testSqlQuery(sql, Collections.singletonList(sql));
+    sql = "SELECT * FROM mytable WHERE DaysSinceEpoch > 16312 LIMIT 10000000";
+    testSqlQuery(sql, Collections.singletonList(sql));
+    sql = "SELECT ArrTime, DaysSinceEpoch, Carrier FROM mytable LIMIT 10000000";
+    testSqlQuery(sql, Collections.singletonList(sql));
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 17ad6da..721e3d8 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -217,6 +217,11 @@ public class CommonConstants {
     public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold";
     public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
 
+    public static final String BROKER_REQUEST_HANDLER_TYPE = "pinot.broker.request.handler.type";
+    public static final String NETTY_BROKER_REQUEST_HANDLER_TYPE = "netty";
+    public static final String GRPC_BROKER_REQUEST_HANDLER_TYPE = "grpc";
+    public static final String DEFAULT_BROKER_REQUEST_HANDLER_TYPE = NETTY_BROKER_REQUEST_HANDLER_TYPE;
+
     public static final String BROKER_TLS_PREFIX = "pinot.broker.tls";
     public static final String BROKER_NETTYTLS_ENABLED = "pinot.broker.nettytls.enabled";
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org