You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/19 18:55:00 UTC

[incubator-pinot] branch master updated: Support streaming query in QueryExecutor (#6027)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe047fd  Support streaming query in QueryExecutor (#6027)
fe047fd is described below

commit fe047fda0145603ed7a8ffb4822f1de528b3dc12
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sat Sep 19 11:54:43 2020 -0700

    Support streaming query in QueryExecutor (#6027)
    
    Add a new method `DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver);` into the `QueryExecutor` interface for the gRPC streaming query support.
    Remove `GrpcQueryExecutor` which is almost identical to `ServerQueryExecutorV1Impl`.
---
 .../core/query/executor/GrpcQueryExecutor.java     | 327 ---------------------
 .../pinot/core/query/executor/QueryExecutor.java   |  26 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  |  29 +-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  76 ++++-
 .../query/scheduler/PrioritySchedulerTest.java     |  36 ++-
 .../pinot/server/starter/ServerInstance.java       |   5 +-
 6 files changed, 133 insertions(+), 366 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
deleted file mode 100644
index e64fd34..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.executor;
-
-import io.grpc.Status;
-import io.grpc.stub.StreamObserver;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.metrics.ServerQueryPhase;
-import org.apache.pinot.common.proto.PinotQueryServerGrpc;
-import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableImplV2;
-import org.apache.pinot.core.common.datatable.DataTableUtils;
-import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.TableDataManager;
-import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
-import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
-import org.apache.pinot.core.plan.Plan;
-import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
-import org.apache.pinot.core.plan.maker.PlanMaker;
-import org.apache.pinot.core.query.config.QueryExecutorConfig;
-import org.apache.pinot.core.query.exception.BadQueryRequestException;
-import org.apache.pinot.core.query.pruner.SegmentPrunerService;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.TimerContext;
-import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
-import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
-import org.apache.pinot.core.util.QueryOptions;
-import org.apache.pinot.core.util.trace.TraceContext;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Query executor for gRPC server requests.
- * <ul>
- *   <li>
- *     For streaming request, multiple (could be 0 if no data should be returned, or query encounters exception) data
- *     responses will be returned, followed by one single metadata response.
- *   </li>
- *   <li>
- *     For non-streaming request, one single response containing both data and metadata will be returned.
- *   </li>
- * </ul>
- * TODO: Plug in QueryScheduler
- */
-public class GrpcQueryExecutor extends PinotQueryServerGrpc.PinotQueryServerImplBase {
-  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryExecutor.class);
-
-  private final InstanceDataManager _instanceDataManager;
-  private final ServerMetrics _serverMetrics;
-  private final long _defaultTimeOutMs;
-  private final SegmentPrunerService _segmentPrunerService;
-  private final PlanMaker _planMaker;
-  private final ExecutorService _executorService =
-      Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
-
-  public GrpcQueryExecutor(PinotConfiguration config, InstanceDataManager instanceDataManager,
-      ServerMetrics serverMetrics)
-      throws ConfigurationException {
-    _instanceDataManager = instanceDataManager;
-    _serverMetrics = serverMetrics;
-    QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config);
-    long defaultTimeoutMs = queryExecutorConfig.getTimeOut();
-    _defaultTimeOutMs =
-        defaultTimeoutMs > 0 ? defaultTimeoutMs : CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
-    _segmentPrunerService = new SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
-    _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig);
-    LOGGER.info("Initialized PinotQueryHandler with default timeout: {}ms, numWorkerThreads: {}", _defaultTimeOutMs,
-        ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
-  }
-
-  @Override
-  public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) {
-    // Deserialize the request
-    ServerQueryRequest queryRequest;
-    try {
-      queryRequest = new ServerQueryRequest(request, _serverMetrics);
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while deserializing the request: {}", request, e);
-      _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
-      responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
-      return;
-    }
-
-    // Process the query
-    try {
-      processQuery(queryRequest, responseObserver);
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while processing request {}: {} from broker: {}", queryRequest.getRequestId(),
-          queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
-      _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
-      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
-    }
-  }
-
-  private void processQuery(ServerQueryRequest queryRequest, StreamObserver<Server.ServerResponse> streamObserver) {
-    TimerContext timerContext = queryRequest.getTimerContext();
-    TimerContext.Timer schedulerWaitTimer = timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
-    if (schedulerWaitTimer != null) {
-      schedulerWaitTimer.stopAndRecord();
-    }
-    long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs();
-    long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs;
-    TimerContext.Timer queryProcessingTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING);
-
-    long requestId = queryRequest.getRequestId();
-    String tableNameWithType = queryRequest.getTableNameWithType();
-    QueryContext queryContext = queryRequest.getQueryContext();
-    LOGGER.debug("Incoming request Id: {}, query: {}", requestId, queryContext);
-    // Use the timeout passed from the request if exists, or the instance-level timeout
-    long queryTimeoutMs = _defaultTimeOutMs;
-    Map<String, String> queryOptions = queryContext.getQueryOptions();
-    if (queryOptions != null) {
-      Long timeoutFromQueryOptions = QueryOptions.getTimeoutMs(queryOptions);
-      if (timeoutFromQueryOptions != null) {
-        queryTimeoutMs = timeoutFromQueryOptions;
-      }
-    }
-
-    // Query scheduler wait time already exceeds query timeout, directly return
-    if (querySchedulingTimeMs >= queryTimeoutMs) {
-      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1);
-      String errorMessage = String
-          .format("Query scheduling took %dms (longer than query timeout of %dms)", querySchedulingTimeMs,
-              queryTimeoutMs);
-      DataTable dataTable = new DataTableImplV2();
-      dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, errorMessage));
-      LOGGER.error("{} while processing requestId: {}", errorMessage, requestId);
-      sendResponse(queryRequest, streamObserver, dataTable);
-      return;
-    }
-
-    TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
-    if (tableDataManager == null) {
-      String errorMessage = "Failed to find table: " + tableNameWithType;
-      DataTable dataTable = new DataTableImplV2();
-      dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage));
-      LOGGER.error("{} while processing requestId: {}", errorMessage, requestId);
-      sendResponse(queryRequest, streamObserver, dataTable);
-      return;
-    }
-
-    List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
-    List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery);
-
-    // When segment is removed from the IdealState:
-    // 1. Controller schedules a state transition to server to turn segment OFFLINE
-    // 2. Server gets the state transition, removes the segment data manager and update its CurrentState
-    // 3. Controller gathers the CurrentState and update the ExternalView
-    // 4. Broker watches ExternalView change and updates the routing table to stop querying the segment
-    //
-    // After step 2 but before step 4, segment will be missing on server side
-    // TODO: Change broker to watch both IdealState and ExternalView to not query the removed segments
-    int numSegmentsQueried = segmentsToQuery.size();
-    int numSegmentsAcquired = segmentDataManagers.size();
-    if (numSegmentsQueried > numSegmentsAcquired) {
-      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS,
-          numSegmentsQueried - numSegmentsAcquired);
-    }
-
-    boolean enableTrace = queryRequest.isEnableTrace();
-    if (enableTrace) {
-      TraceContext.register(requestId);
-    }
-
-    int numConsumingSegmentsProcessed = 0;
-    long minIndexTimeMs = Long.MAX_VALUE;
-    long minIngestionTimeMs = Long.MAX_VALUE;
-    // gather stats for realtime consuming segments
-    for (SegmentDataManager segmentMgr : segmentDataManagers) {
-      if (segmentMgr.getSegment() instanceof MutableSegment) {
-        numConsumingSegmentsProcessed += 1;
-        SegmentMetadata metadata = segmentMgr.getSegment().getSegmentMetadata();
-        long indexedTime = metadata.getLastIndexedTimestamp();
-        if (indexedTime != Long.MIN_VALUE && indexedTime < minIndexTimeMs) {
-          minIndexTimeMs = metadata.getLastIndexedTimestamp();
-        }
-        long ingestionTime = metadata.getLatestIngestionTimestamp();
-        if (ingestionTime != Long.MIN_VALUE && ingestionTime < minIngestionTimeMs) {
-          minIngestionTimeMs = ingestionTime;
-        }
-      }
-    }
-
-    long minConsumingFreshnessTimeMs = minIngestionTimeMs;
-    if (numConsumingSegmentsProcessed > 0) {
-      if (minIngestionTimeMs == Long.MAX_VALUE) {
-        LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead");
-        minConsumingFreshnessTimeMs = minIndexTimeMs;
-      }
-      LOGGER
-          .debug("Querying: {} consuming segments with minConsumingFreshnessTimeMs: {}", numConsumingSegmentsProcessed,
-              minConsumingFreshnessTimeMs);
-    }
-
-    DataTable dataTable = null;
-    try {
-      // Compute total docs for the table before pruning the segments
-      long numTotalDocs = 0;
-      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-        numTotalDocs += segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs();
-      }
-      TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
-      segmentDataManagers = _segmentPrunerService.prune(tableDataManager, segmentDataManagers, queryRequest);
-      segmentPruneTimer.stopAndRecord();
-      int numSegmentsMatchedAfterPruning = segmentDataManagers.size();
-      LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning);
-      if (numSegmentsMatchedAfterPruning == 0) {
-        dataTable =
-            queryRequest.isEnableStreaming() ? new DataTableImplV2() : DataTableUtils.buildEmptyDataTable(queryContext);
-        Map<String, String> metadata = dataTable.getMetadata();
-        metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs));
-        metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
-        metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
-        metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, "0");
-        metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
-        metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
-      } else {
-        TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
-        List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsMatchedAfterPruning);
-        for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-          indexSegments.add(segmentDataManager.getSegment());
-        }
-        long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
-        Plan globalQueryPlan = queryRequest.isEnableStreaming() ? _planMaker
-            .makeStreamingInstancePlan(indexSegments, queryContext, _executorService, streamObserver, endTimeMs)
-            : _planMaker.makeInstancePlan(indexSegments, queryContext, _executorService, endTimeMs);
-        planBuildTimer.stopAndRecord();
-
-        TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
-        dataTable = globalQueryPlan.execute();
-        planExecTimer.stopAndRecord();
-
-        // Update the total docs in the metadata based on un-pruned segments.
-        dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, Long.toString(numTotalDocs));
-      }
-    } catch (Exception e) {
-      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
-
-      // Do not log error for BadQueryRequestException because it's caused by bad query
-      if (e instanceof BadQueryRequestException) {
-        LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage());
-      } else {
-        LOGGER.error("Exception processing requestId {}", requestId, e);
-      }
-
-      dataTable = new DataTableImplV2();
-      dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
-    } finally {
-      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-        tableDataManager.releaseSegment(segmentDataManager);
-      }
-      if (enableTrace) {
-        if (dataTable != null) {
-          dataTable.getMetadata().put(DataTable.TRACE_INFO_METADATA_KEY, TraceContext.getTraceInfo());
-        }
-        TraceContext.unregister();
-      }
-    }
-
-    queryProcessingTimer.stopAndRecord();
-    long queryProcessingTime = queryProcessingTimer.getDurationMs();
-    dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, Integer.toString(numSegmentsQueried));
-    dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, Long.toString(queryProcessingTime));
-
-    if (numConsumingSegmentsProcessed > 0) {
-      dataTable.getMetadata()
-          .put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED, Integer.toString(numConsumingSegmentsProcessed));
-      dataTable.getMetadata()
-          .put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, Long.toString(minConsumingFreshnessTimeMs));
-    }
-
-    LOGGER.debug("Query processing time for request Id - {}: {}", requestId, queryProcessingTime);
-    LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable);
-
-    // TODO: Log query stats
-
-    sendResponse(queryRequest, streamObserver, dataTable);
-  }
-
-  private void sendResponse(ServerQueryRequest queryRequest, StreamObserver<Server.ServerResponse> streamObserver,
-      DataTable dataTable) {
-    Server.ServerResponse response;
-    try {
-      response = queryRequest.isEnableStreaming() ? StreamingResponseUtils.getMetadataResponse(dataTable)
-          : StreamingResponseUtils.getNonStreamingResponse(dataTable);
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while constructing response from data table for request {}: {} from broker: {}",
-          queryRequest.getRequestId(), queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
-      _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1);
-      streamObserver.onError(Status.INTERNAL.withCause(e).asException());
-      return;
-    }
-    streamObserver.onNext(response);
-    streamObserver.onCompleted();
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
index f5ada2f..42140bc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
@@ -18,12 +18,13 @@
  */
 package org.apache.pinot.core.query.executor;
 
+import io.grpc.stub.StreamObserver;
 import java.util.concurrent.ExecutorService;
-
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -53,7 +54,24 @@ public interface QueryExecutor {
   void shutDown();
 
   /**
-   * Processes the query with the given executor service.
+   * Processes the non-streaming query with the given executor service.
+   */
+  default DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService) {
+    return processQuery(queryRequest, executorService, null);
+  }
+
+  /**
+   * Processes the query (streaming or non-streaming) with the given executor service.
+   * <ul>
+   *   <li>
+   *     For streaming request, the returned DataTable contains only the metadata. The response is streamed back via the
+   *     observer.
+   *   </li>
+   *   <li>
+   *     For non-streaming request, the returned DataTable contains both data and metadata.
+   *   </li>
+   * </ul>
    */
-  DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService);
+  DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService,
+      @Nullable StreamObserver<Server.ServerResponse> responseObserver);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index df29292..ae3d3ce 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -18,17 +18,19 @@
  */
 package org.apache.pinot.core.query.executor;
 
-import com.google.common.base.Preconditions;
+import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableImplV2;
@@ -59,9 +61,9 @@ import org.slf4j.LoggerFactory;
 public class ServerQueryExecutorV1Impl implements QueryExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(ServerQueryExecutorV1Impl.class);
 
-  private InstanceDataManager _instanceDataManager = null;
-  private SegmentPrunerService _segmentPrunerService = null;
-  private PlanMaker _planMaker = null;
+  private InstanceDataManager _instanceDataManager;
+  private SegmentPrunerService _segmentPrunerService;
+  private PlanMaker _planMaker;
   private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
   private ServerMetrics _serverMetrics;
 
@@ -94,7 +96,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
   }
 
   @Override
-  public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService) {
+  public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService,
+      @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
     TimerContext timerContext = queryRequest.getTimerContext();
     TimerContext.Timer schedulerWaitTimer = timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
     if (schedulerWaitTimer != null) {
@@ -131,7 +134,13 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     }
 
     TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
-    Preconditions.checkState(tableDataManager != null, "Failed to find data manager for table: " + tableNameWithType);
+    if (tableDataManager == null) {
+      String errorMessage = "Failed to find table: " + tableNameWithType;
+      DataTable dataTable = new DataTableImplV2();
+      dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage));
+      LOGGER.error("{} while processing requestId: {}", errorMessage, requestId);
+      return dataTable;
+    }
 
     List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
     List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery);
@@ -199,7 +208,9 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       int numSegmentsMatchedAfterPruning = segmentDataManagers.size();
       LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning);
       if (numSegmentsMatchedAfterPruning == 0) {
-        dataTable = DataTableUtils.buildEmptyDataTable(queryContext);
+        // Only return metadata for streaming query
+        dataTable =
+            queryRequest.isEnableStreaming() ? new DataTableImplV2() : DataTableUtils.buildEmptyDataTable(queryContext);
         Map<String, String> metadata = dataTable.getMetadata();
         metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs));
         metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
@@ -214,7 +225,9 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
           indexSegments.add(segmentDataManager.getSegment());
         }
         long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
-        Plan globalQueryPlan = _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs);
+        Plan globalQueryPlan = queryRequest.isEnableStreaming() ? _planMaker
+            .makeStreamingInstancePlan(indexSegments, queryContext, executorService, responseObserver, endTimeMs)
+            : _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs);
         planBuildTimer.stopAndRecord();
 
         TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 7413070..64a6407 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -20,18 +20,45 @@ package org.apache.pinot.core.transport.grpc;
 
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
 import java.io.IOException;
-import org.apache.pinot.core.query.executor.GrpcQueryExecutor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.PinotQueryServerGrpc;
+import org.apache.pinot.common.proto.Server.ServerRequest;
+import org.apache.pinot.common.proto.Server.ServerResponse;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class GrpcQueryServer {
+// TODO: Plug in QueryScheduler
+public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class);
+
+  private final QueryExecutor _queryExecutor;
+  private final ServerMetrics _serverMetrics;
   private final Server _server;
+  private final ExecutorService _executorService =
+      Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
 
-  public GrpcQueryServer(int port, GrpcQueryExecutor queryExecutor) {
-    _server = ServerBuilder.forPort(port).addService(queryExecutor).build();
+  public GrpcQueryServer(int port, QueryExecutor queryExecutor, ServerMetrics serverMetrics) {
+    _queryExecutor = queryExecutor;
+    _serverMetrics = serverMetrics;
+    _server = ServerBuilder.forPort(port).addService(this).build();
+    LOGGER.info("Initialized GrpcQueryServer on port: {} with numWorkerThreads: {}", port,
+        ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
   }
 
   public void start() {
+    LOGGER.info("Starting GrpcQueryServer");
     try {
       _server.start();
     } catch (IOException e) {
@@ -40,10 +67,51 @@ public class GrpcQueryServer {
   }
 
   public void shutdown() {
+    LOGGER.info("Shutting down GrpcQueryServer");
     try {
       _server.shutdown().awaitTermination();
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
   }
+
+  @Override
+  public void submit(ServerRequest request, StreamObserver<ServerResponse> responseObserver) {
+    // Deserialize the request
+    ServerQueryRequest queryRequest;
+    try {
+      queryRequest = new ServerQueryRequest(request, _serverMetrics);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while deserializing the request: {}", request, e);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+      responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
+      return;
+    }
+
+    // Process the query
+    DataTable dataTable;
+    try {
+      dataTable = _queryExecutor.processQuery(queryRequest, _executorService, responseObserver);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing request {}: {} from broker: {}", queryRequest.getRequestId(),
+          queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+      return;
+    }
+
+    ServerResponse response;
+    try {
+      response = queryRequest.isEnableStreaming() ? StreamingResponseUtils.getMetadataResponse(dataTable)
+          : StreamingResponseUtils.getNonStreamingResponse(dataTable);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while constructing response from data table for request {}: {} from broker: {}",
+          queryRequest.getRequestId(), queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+      return;
+    }
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 0eb0ea3..d378820 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -18,12 +18,10 @@
  */
 package org.apache.pinot.core.query.scheduler;
 
-import static org.apache.pinot.core.query.scheduler.TestHelper.createQueryRequest;
-import static org.apache.pinot.core.query.scheduler.TestHelper.createServerQueryRequest;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.yammer.metrics.core.MetricsRegistry;
+import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -39,12 +37,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAccumulator;
-
-import javax.annotation.Nonnull;
-
+import javax.annotation.Nullable;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.common.datatable.DataTableImplV2;
@@ -58,9 +55,11 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.yammer.metrics.core.MetricsRegistry;
+import static org.apache.pinot.core.query.scheduler.TestHelper.createQueryRequest;
+import static org.apache.pinot.core.query.scheduler.TestHelper.createServerQueryRequest;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 
 public class PrioritySchedulerTest {
@@ -251,9 +250,9 @@ public class PrioritySchedulerTest {
     }
 
     // store locally for easy access
-    public TestPriorityScheduler(@Nonnull PinotConfiguration config, @Nonnull ResourceManager resourceManager,
-        @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
-        @Nonnull LongAccumulator latestQueryTime) {
+    public TestPriorityScheduler(PinotConfiguration config, ResourceManager resourceManager,
+        QueryExecutor queryExecutor, SchedulerPriorityQueue queue, ServerMetrics metrics,
+        LongAccumulator latestQueryTime) {
       super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime);
     }
 
@@ -286,8 +285,7 @@ public class PrioritySchedulerTest {
   static class TestQueryExecutor implements QueryExecutor {
 
     @Override
-    public void init(@Nonnull PinotConfiguration config, @Nonnull InstanceDataManager instanceDataManager,
-        @Nonnull ServerMetrics serverMetrics) {
+    public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics) {
     }
 
     @Override
@@ -298,9 +296,9 @@ public class PrioritySchedulerTest {
     public void shutDown() {
     }
 
-    @Nonnull
     @Override
-    public DataTable processQuery(@Nonnull ServerQueryRequest queryRequest, @Nonnull ExecutorService executorService) {
+    public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService,
+        @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
       if (useBarrier) {
         try {
           startupBarrier.await();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index fa1903c..c8c1eed 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -30,7 +30,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.transform.function.TransformFunction;
 import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
-import org.apache.pinot.core.query.executor.GrpcQueryExecutor;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
@@ -96,9 +95,7 @@ public class ServerInstance {
     if (serverConf.isEnableGrpcServer()) {
       int grpcPort = serverConf.getGrpcPort();
       LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
-      GrpcQueryExecutor grpcQueryExecutor =
-          new GrpcQueryExecutor(queryExecutorConfig, _instanceDataManager, _serverMetrics);
-      _grpcQueryServer = new GrpcQueryServer(grpcPort, grpcQueryExecutor);
+      _grpcQueryServer = new GrpcQueryServer(grpcPort, _queryExecutor, _serverMetrics);
     } else {
       _grpcQueryServer = null;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org