You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/19 18:55:00 UTC
[incubator-pinot] branch master updated: Support streaming query in
QueryExecutor (#6027)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe047fd Support streaming query in QueryExecutor (#6027)
fe047fd is described below
commit fe047fda0145603ed7a8ffb4822f1de528b3dc12
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sat Sep 19 11:54:43 2020 -0700
Support streaming query in QueryExecutor (#6027)
Add a new method `DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver);` into the `QueryExecutor` interface for the gRPC streaming query support.
Remove `GrpcQueryExecutor` which is almost identical to `ServerQueryExecutorV1Impl`.
---
.../core/query/executor/GrpcQueryExecutor.java | 327 ---------------------
.../pinot/core/query/executor/QueryExecutor.java | 26 +-
.../query/executor/ServerQueryExecutorV1Impl.java | 29 +-
.../pinot/core/transport/grpc/GrpcQueryServer.java | 76 ++++-
.../query/scheduler/PrioritySchedulerTest.java | 36 ++-
.../pinot/server/starter/ServerInstance.java | 5 +-
6 files changed, 133 insertions(+), 366 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
deleted file mode 100644
index e64fd34..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.executor;
-
-import io.grpc.Status;
-import io.grpc.stub.StreamObserver;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metrics.ServerMeter;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.metrics.ServerQueryPhase;
-import org.apache.pinot.common.proto.PinotQueryServerGrpc;
-import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableImplV2;
-import org.apache.pinot.core.common.datatable.DataTableUtils;
-import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.TableDataManager;
-import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
-import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
-import org.apache.pinot.core.plan.Plan;
-import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
-import org.apache.pinot.core.plan.maker.PlanMaker;
-import org.apache.pinot.core.query.config.QueryExecutorConfig;
-import org.apache.pinot.core.query.exception.BadQueryRequestException;
-import org.apache.pinot.core.query.pruner.SegmentPrunerService;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.TimerContext;
-import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
-import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
-import org.apache.pinot.core.util.QueryOptions;
-import org.apache.pinot.core.util.trace.TraceContext;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Query executor for gRPC server requests.
- * <ul>
- * <li>
- * For streaming request, multiple (could be 0 if no data should be returned, or query encounters exception) data
- * responses will be returned, followed by one single metadata response.
- * </li>
- * <li>
- * For non-streaming request, one single response containing both data and metadata will be returned.
- * </li>
- * </ul>
- * TODO: Plug in QueryScheduler
- */
-public class GrpcQueryExecutor extends PinotQueryServerGrpc.PinotQueryServerImplBase {
- private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryExecutor.class);
-
- private final InstanceDataManager _instanceDataManager;
- private final ServerMetrics _serverMetrics;
- private final long _defaultTimeOutMs;
- private final SegmentPrunerService _segmentPrunerService;
- private final PlanMaker _planMaker;
- private final ExecutorService _executorService =
- Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
-
- public GrpcQueryExecutor(PinotConfiguration config, InstanceDataManager instanceDataManager,
- ServerMetrics serverMetrics)
- throws ConfigurationException {
- _instanceDataManager = instanceDataManager;
- _serverMetrics = serverMetrics;
- QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config);
- long defaultTimeoutMs = queryExecutorConfig.getTimeOut();
- _defaultTimeOutMs =
- defaultTimeoutMs > 0 ? defaultTimeoutMs : CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
- _segmentPrunerService = new SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
- _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig);
- LOGGER.info("Initialized PinotQueryHandler with default timeout: {}ms, numWorkerThreads: {}", _defaultTimeOutMs,
- ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
- }
-
- @Override
- public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) {
- // Deserialize the request
- ServerQueryRequest queryRequest;
- try {
- queryRequest = new ServerQueryRequest(request, _serverMetrics);
- } catch (Exception e) {
- LOGGER.error("Caught exception while deserializing the request: {}", request, e);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
- responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
- return;
- }
-
- // Process the query
- try {
- processQuery(queryRequest, responseObserver);
- } catch (Exception e) {
- LOGGER.error("Caught exception while processing request {}: {} from broker: {}", queryRequest.getRequestId(),
- queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
- responseObserver.onError(Status.INTERNAL.withCause(e).asException());
- }
- }
-
- private void processQuery(ServerQueryRequest queryRequest, StreamObserver<Server.ServerResponse> streamObserver) {
- TimerContext timerContext = queryRequest.getTimerContext();
- TimerContext.Timer schedulerWaitTimer = timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
- if (schedulerWaitTimer != null) {
- schedulerWaitTimer.stopAndRecord();
- }
- long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs();
- long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs;
- TimerContext.Timer queryProcessingTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING);
-
- long requestId = queryRequest.getRequestId();
- String tableNameWithType = queryRequest.getTableNameWithType();
- QueryContext queryContext = queryRequest.getQueryContext();
- LOGGER.debug("Incoming request Id: {}, query: {}", requestId, queryContext);
- // Use the timeout passed from the request if exists, or the instance-level timeout
- long queryTimeoutMs = _defaultTimeOutMs;
- Map<String, String> queryOptions = queryContext.getQueryOptions();
- if (queryOptions != null) {
- Long timeoutFromQueryOptions = QueryOptions.getTimeoutMs(queryOptions);
- if (timeoutFromQueryOptions != null) {
- queryTimeoutMs = timeoutFromQueryOptions;
- }
- }
-
- // Query scheduler wait time already exceeds query timeout, directly return
- if (querySchedulingTimeMs >= queryTimeoutMs) {
- _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1);
- String errorMessage = String
- .format("Query scheduling took %dms (longer than query timeout of %dms)", querySchedulingTimeMs,
- queryTimeoutMs);
- DataTable dataTable = new DataTableImplV2();
- dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, errorMessage));
- LOGGER.error("{} while processing requestId: {}", errorMessage, requestId);
- sendResponse(queryRequest, streamObserver, dataTable);
- return;
- }
-
- TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
- if (tableDataManager == null) {
- String errorMessage = "Failed to find table: " + tableNameWithType;
- DataTable dataTable = new DataTableImplV2();
- dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage));
- LOGGER.error("{} while processing requestId: {}", errorMessage, requestId);
- sendResponse(queryRequest, streamObserver, dataTable);
- return;
- }
-
- List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
- List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery);
-
- // When segment is removed from the IdealState:
- // 1. Controller schedules a state transition to server to turn segment OFFLINE
- // 2. Server gets the state transition, removes the segment data manager and update its CurrentState
- // 3. Controller gathers the CurrentState and update the ExternalView
- // 4. Broker watches ExternalView change and updates the routing table to stop querying the segment
- //
- // After step 2 but before step 4, segment will be missing on server side
- // TODO: Change broker to watch both IdealState and ExternalView to not query the removed segments
- int numSegmentsQueried = segmentsToQuery.size();
- int numSegmentsAcquired = segmentDataManagers.size();
- if (numSegmentsQueried > numSegmentsAcquired) {
- _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS,
- numSegmentsQueried - numSegmentsAcquired);
- }
-
- boolean enableTrace = queryRequest.isEnableTrace();
- if (enableTrace) {
- TraceContext.register(requestId);
- }
-
- int numConsumingSegmentsProcessed = 0;
- long minIndexTimeMs = Long.MAX_VALUE;
- long minIngestionTimeMs = Long.MAX_VALUE;
- // gather stats for realtime consuming segments
- for (SegmentDataManager segmentMgr : segmentDataManagers) {
- if (segmentMgr.getSegment() instanceof MutableSegment) {
- numConsumingSegmentsProcessed += 1;
- SegmentMetadata metadata = segmentMgr.getSegment().getSegmentMetadata();
- long indexedTime = metadata.getLastIndexedTimestamp();
- if (indexedTime != Long.MIN_VALUE && indexedTime < minIndexTimeMs) {
- minIndexTimeMs = metadata.getLastIndexedTimestamp();
- }
- long ingestionTime = metadata.getLatestIngestionTimestamp();
- if (ingestionTime != Long.MIN_VALUE && ingestionTime < minIngestionTimeMs) {
- minIngestionTimeMs = ingestionTime;
- }
- }
- }
-
- long minConsumingFreshnessTimeMs = minIngestionTimeMs;
- if (numConsumingSegmentsProcessed > 0) {
- if (minIngestionTimeMs == Long.MAX_VALUE) {
- LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead");
- minConsumingFreshnessTimeMs = minIndexTimeMs;
- }
- LOGGER
- .debug("Querying: {} consuming segments with minConsumingFreshnessTimeMs: {}", numConsumingSegmentsProcessed,
- minConsumingFreshnessTimeMs);
- }
-
- DataTable dataTable = null;
- try {
- // Compute total docs for the table before pruning the segments
- long numTotalDocs = 0;
- for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- numTotalDocs += segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs();
- }
- TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
- segmentDataManagers = _segmentPrunerService.prune(tableDataManager, segmentDataManagers, queryRequest);
- segmentPruneTimer.stopAndRecord();
- int numSegmentsMatchedAfterPruning = segmentDataManagers.size();
- LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning);
- if (numSegmentsMatchedAfterPruning == 0) {
- dataTable =
- queryRequest.isEnableStreaming() ? new DataTableImplV2() : DataTableUtils.buildEmptyDataTable(queryContext);
- Map<String, String> metadata = dataTable.getMetadata();
- metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs));
- metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
- metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
- metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, "0");
- metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
- metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
- } else {
- TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
- List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsMatchedAfterPruning);
- for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- indexSegments.add(segmentDataManager.getSegment());
- }
- long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
- Plan globalQueryPlan = queryRequest.isEnableStreaming() ? _planMaker
- .makeStreamingInstancePlan(indexSegments, queryContext, _executorService, streamObserver, endTimeMs)
- : _planMaker.makeInstancePlan(indexSegments, queryContext, _executorService, endTimeMs);
- planBuildTimer.stopAndRecord();
-
- TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
- dataTable = globalQueryPlan.execute();
- planExecTimer.stopAndRecord();
-
- // Update the total docs in the metadata based on un-pruned segments.
- dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, Long.toString(numTotalDocs));
- }
- } catch (Exception e) {
- _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
-
- // Do not log error for BadQueryRequestException because it's caused by bad query
- if (e instanceof BadQueryRequestException) {
- LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage());
- } else {
- LOGGER.error("Exception processing requestId {}", requestId, e);
- }
-
- dataTable = new DataTableImplV2();
- dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
- } finally {
- for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- tableDataManager.releaseSegment(segmentDataManager);
- }
- if (enableTrace) {
- if (dataTable != null) {
- dataTable.getMetadata().put(DataTable.TRACE_INFO_METADATA_KEY, TraceContext.getTraceInfo());
- }
- TraceContext.unregister();
- }
- }
-
- queryProcessingTimer.stopAndRecord();
- long queryProcessingTime = queryProcessingTimer.getDurationMs();
- dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, Integer.toString(numSegmentsQueried));
- dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, Long.toString(queryProcessingTime));
-
- if (numConsumingSegmentsProcessed > 0) {
- dataTable.getMetadata()
- .put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED, Integer.toString(numConsumingSegmentsProcessed));
- dataTable.getMetadata()
- .put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, Long.toString(minConsumingFreshnessTimeMs));
- }
-
- LOGGER.debug("Query processing time for request Id - {}: {}", requestId, queryProcessingTime);
- LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable);
-
- // TODO: Log query stats
-
- sendResponse(queryRequest, streamObserver, dataTable);
- }
-
- private void sendResponse(ServerQueryRequest queryRequest, StreamObserver<Server.ServerResponse> streamObserver,
- DataTable dataTable) {
- Server.ServerResponse response;
- try {
- response = queryRequest.isEnableStreaming() ? StreamingResponseUtils.getMetadataResponse(dataTable)
- : StreamingResponseUtils.getNonStreamingResponse(dataTable);
- } catch (Exception e) {
- LOGGER.error("Caught exception while constructing response from data table for request {}: {} from broker: {}",
- queryRequest.getRequestId(), queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1);
- streamObserver.onError(Status.INTERNAL.withCause(e).asException());
- return;
- }
- streamObserver.onNext(response);
- streamObserver.onCompleted();
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
index f5ada2f..42140bc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
@@ -18,12 +18,13 @@
*/
package org.apache.pinot.core.query.executor;
+import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
-
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -53,7 +54,24 @@ public interface QueryExecutor {
void shutDown();
/**
- * Processes the query with the given executor service.
+ * Processes the non-streaming query with the given executor service.
+ */
+ default DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService) {
+ return processQuery(queryRequest, executorService, null);
+ }
+
+ /**
+ * Processes the query (streaming or non-streaming) with the given executor service.
+ * <ul>
+ * <li>
+ * For streaming request, the returned DataTable contains only the metadata. The response is streamed back via the
+ * observer.
+ * </li>
+ * <li>
+ * For non-streaming request, the returned DataTable contains both data and metadata.
+ * </li>
+ * </ul>
*/
- DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService);
+ DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService,
+ @Nullable StreamObserver<Server.ServerResponse> responseObserver);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index df29292..ae3d3ce 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -18,17 +18,19 @@
*/
package org.apache.pinot.core.query.executor;
-import com.google.common.base.Preconditions;
+import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
@@ -59,9 +61,9 @@ import org.slf4j.LoggerFactory;
public class ServerQueryExecutorV1Impl implements QueryExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerQueryExecutorV1Impl.class);
- private InstanceDataManager _instanceDataManager = null;
- private SegmentPrunerService _segmentPrunerService = null;
- private PlanMaker _planMaker = null;
+ private InstanceDataManager _instanceDataManager;
+ private SegmentPrunerService _segmentPrunerService;
+ private PlanMaker _planMaker;
private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
private ServerMetrics _serverMetrics;
@@ -94,7 +96,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
}
@Override
- public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService) {
+ public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService,
+ @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
TimerContext timerContext = queryRequest.getTimerContext();
TimerContext.Timer schedulerWaitTimer = timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
if (schedulerWaitTimer != null) {
@@ -131,7 +134,13 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
}
TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
- Preconditions.checkState(tableDataManager != null, "Failed to find data manager for table: " + tableNameWithType);
+ if (tableDataManager == null) {
+ String errorMessage = "Failed to find table: " + tableNameWithType;
+ DataTable dataTable = new DataTableImplV2();
+ dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage));
+ LOGGER.error("{} while processing requestId: {}", errorMessage, requestId);
+ return dataTable;
+ }
List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery);
@@ -199,7 +208,9 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
int numSegmentsMatchedAfterPruning = segmentDataManagers.size();
LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning);
if (numSegmentsMatchedAfterPruning == 0) {
- dataTable = DataTableUtils.buildEmptyDataTable(queryContext);
+ // Only return metadata for streaming query
+ dataTable =
+ queryRequest.isEnableStreaming() ? new DataTableImplV2() : DataTableUtils.buildEmptyDataTable(queryContext);
Map<String, String> metadata = dataTable.getMetadata();
metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs));
metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
@@ -214,7 +225,9 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
indexSegments.add(segmentDataManager.getSegment());
}
long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
- Plan globalQueryPlan = _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs);
+ Plan globalQueryPlan = queryRequest.isEnableStreaming() ? _planMaker
+ .makeStreamingInstancePlan(indexSegments, queryContext, executorService, responseObserver, endTimeMs)
+ : _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs);
planBuildTimer.stopAndRecord();
TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 7413070..64a6407 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -20,18 +20,45 @@ package org.apache.pinot.core.transport.grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
-import org.apache.pinot.core.query.executor.GrpcQueryExecutor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.PinotQueryServerGrpc;
+import org.apache.pinot.common.proto.Server.ServerRequest;
+import org.apache.pinot.common.proto.Server.ServerResponse;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class GrpcQueryServer {
+// TODO: Plug in QueryScheduler
+public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class);
+
+ private final QueryExecutor _queryExecutor;
+ private final ServerMetrics _serverMetrics;
private final Server _server;
+ private final ExecutorService _executorService =
+ Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
- public GrpcQueryServer(int port, GrpcQueryExecutor queryExecutor) {
- _server = ServerBuilder.forPort(port).addService(queryExecutor).build();
+ public GrpcQueryServer(int port, QueryExecutor queryExecutor, ServerMetrics serverMetrics) {
+ _queryExecutor = queryExecutor;
+ _serverMetrics = serverMetrics;
+ _server = ServerBuilder.forPort(port).addService(this).build();
+ LOGGER.info("Initialized GrpcQueryServer on port: {} with numWorkerThreads: {}", port,
+ ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
}
public void start() {
+ LOGGER.info("Starting GrpcQueryServer");
try {
_server.start();
} catch (IOException e) {
@@ -40,10 +67,51 @@ public class GrpcQueryServer {
}
public void shutdown() {
+ LOGGER.info("Shutting down GrpcQueryServer");
try {
_server.shutdown().awaitTermination();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
+
+ @Override
+ public void submit(ServerRequest request, StreamObserver<ServerResponse> responseObserver) {
+ // Deserialize the request
+ ServerQueryRequest queryRequest;
+ try {
+ queryRequest = new ServerQueryRequest(request, _serverMetrics);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while deserializing the request: {}", request, e);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+ responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
+ return;
+ }
+
+ // Process the query
+ DataTable dataTable;
+ try {
+ dataTable = _queryExecutor.processQuery(queryRequest, _executorService, responseObserver);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing request {}: {} from broker: {}", queryRequest.getRequestId(),
+ queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+ responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+ return;
+ }
+
+ ServerResponse response;
+ try {
+ response = queryRequest.isEnableStreaming() ? StreamingResponseUtils.getMetadataResponse(dataTable)
+ : StreamingResponseUtils.getNonStreamingResponse(dataTable);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while constructing response from data table for request {}: {} from broker: {}",
+ queryRequest.getRequestId(), queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1);
+ responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+ return;
+ }
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 0eb0ea3..d378820 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -18,12 +18,10 @@
*/
package org.apache.pinot.core.query.scheduler;
-import static org.apache.pinot.core.query.scheduler.TestHelper.createQueryRequest;
-import static org.apache.pinot.core.query.scheduler.TestHelper.createServerQueryRequest;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.yammer.metrics.core.MetricsRegistry;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -39,12 +37,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
-
-import javax.annotation.Nonnull;
-
+import javax.annotation.Nullable;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
@@ -58,9 +55,11 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.yammer.metrics.core.MetricsRegistry;
+import static org.apache.pinot.core.query.scheduler.TestHelper.createQueryRequest;
+import static org.apache.pinot.core.query.scheduler.TestHelper.createServerQueryRequest;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
public class PrioritySchedulerTest {
@@ -251,9 +250,9 @@ public class PrioritySchedulerTest {
}
// store locally for easy access
- public TestPriorityScheduler(@Nonnull PinotConfiguration config, @Nonnull ResourceManager resourceManager,
- @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
- @Nonnull LongAccumulator latestQueryTime) {
+ public TestPriorityScheduler(PinotConfiguration config, ResourceManager resourceManager,
+ QueryExecutor queryExecutor, SchedulerPriorityQueue queue, ServerMetrics metrics,
+ LongAccumulator latestQueryTime) {
super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime);
}
@@ -286,8 +285,7 @@ public class PrioritySchedulerTest {
static class TestQueryExecutor implements QueryExecutor {
@Override
- public void init(@Nonnull PinotConfiguration config, @Nonnull InstanceDataManager instanceDataManager,
- @Nonnull ServerMetrics serverMetrics) {
+ public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics) {
}
@Override
@@ -298,9 +296,9 @@ public class PrioritySchedulerTest {
public void shutDown() {
}
- @Nonnull
@Override
- public DataTable processQuery(@Nonnull ServerQueryRequest queryRequest, @Nonnull ExecutorService executorService) {
+ public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService,
+ @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
if (useBarrier) {
try {
startupBarrier.await();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index fa1903c..c8c1eed 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -30,7 +30,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
-import org.apache.pinot.core.query.executor.GrpcQueryExecutor;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
@@ -96,9 +95,7 @@ public class ServerInstance {
if (serverConf.isEnableGrpcServer()) {
int grpcPort = serverConf.getGrpcPort();
LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
- GrpcQueryExecutor grpcQueryExecutor =
- new GrpcQueryExecutor(queryExecutorConfig, _instanceDataManager, _serverMetrics);
- _grpcQueryServer = new GrpcQueryServer(grpcPort, grpcQueryExecutor);
+ _grpcQueryServer = new GrpcQueryServer(grpcPort, _queryExecutor, _serverMetrics);
} else {
_grpcQueryServer = null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org