You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/19 13:24:36 UTC

[GitHub] [incubator-pinot] elonazoulay opened a new pull request #5717: Add streaming query handler

elonazoulay opened a new pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717


   ## Description
   Add a description of your PR here.
   A good description should include pointers to an issue or design document, etc.
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release.
   
   If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text
   
   ## Documentation
   If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] elonazoulay commented on a change in pull request #5717: Add streaming query handler

Posted by GitBox <gi...@apache.org>.
elonazoulay commented on a change in pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r460429801



##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
##########
@@ -18,6 +18,57 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
 public class PinotQueryService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryService.class);
+
+  private final int port;
+  private final Server server;
+
+  // TODO: pass config, and parameters to initialize handler
+  public PinotQueryService(int port) {
+    this.port = port;
+    server = ServerBuilder.forPort(port)
+        .addService(new PinotQueryHandler())
+        .build();
+
+  }
+
+  /** Start serving requests. */
+  public void start() throws IOException {
+    server.start();
+    LOGGER.info("Server started. Listening on {}", port);
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {

Review comment:
       Removed, can always add it back if needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] elonazoulay commented on a change in pull request #5717: Add streaming query handler

Posted by GitBox <gi...@apache.org>.
elonazoulay commented on a change in pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r460430593



##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
##########
@@ -18,18 +18,258 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Handler for grpc server requests.
  * As data becomes available server responses will be added to the result stream.
  * Once the request is complete the client will aggregate the result metadata.
  */
 public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryHandler.class);
+
+  private InstanceDataManager _instanceDataManager = null;
+  private SegmentPrunerService _segmentPrunerService = null;
+  private PlanMaker _planMaker = null;
+  private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+  private ServerMetrics _serverMetrics;
+
+  public synchronized void init(PinotConfiguration config, InstanceDataManager instanceDataManager,
+                                ServerMetrics serverMetrics)
+      throws ConfigurationException {
+    _instanceDataManager = instanceDataManager;
+    _serverMetrics = serverMetrics;
+    QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config);
+    if (queryExecutorConfig.getTimeOut() > 0) {
+      _defaultTimeOutMs = queryExecutorConfig.getTimeOut();
+    }
+    LOGGER.info("Default timeout for query executor : {}", _defaultTimeOutMs);
+    LOGGER.info("Trying to build SegmentPrunerService");
+    _segmentPrunerService = new SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
+    LOGGER.info("Trying to build QueryPlanMaker");
+    _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig);
+    LOGGER.info("Trying to build QueryExecutorTimer");
+  }
+
+  /**
+   * Submit a streaming request to the server
+   * @param request
+   * @param responseObserver
+   */
   @Override
   public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) {
+    // TODO: implement, follow up whether to use ServerQueryRequest
+  }
+
+  public DataTable processQuery(ServerQueryRequest queryRequest,

Review comment:
       Will be pushing an update for this as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #5717: Add streaming query handler

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r485021999



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcRequestBuilder.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport.grpc;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.CommonConstants.Query.Request;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+
+public class GrpcRequestBuilder {

Review comment:
       shall we consider move this to pinot-common? So other system integration won't require pinot-core as the dependency

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -190,7 +190,12 @@
     public static final String CONFIG_OF_QUERY_EXECUTOR_CLASS = "pinot.server.query.executor.class";
     public static final String CONFIG_OF_REQUEST_HANDLER_FACTORY_CLASS = "pinot.server.requestHandlerFactory.class";
     public static final String CONFIG_OF_NETTY_PORT = "pinot.server.netty.port";
+    public static final String CONFIG_OF_ENABLE_GRPC_SERVER = "pinot.server.grpc.enable";
+    public static final boolean DEFAULT_ENABLE_GRPC_SERVER = false;
+    public static final String CONFIG_OF_GRPC_PORT = "pinot.server.grpc.port";

Review comment:
       I think we need to expose this port through helix eventually.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryClient.java
##########
@@ -16,20 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.starter.grpc;
+package org.apache.pinot.core.transport.grpc;
 
-import io.grpc.stub.StreamObserver;
+import io.grpc.Channel;
+import io.grpc.ManagedChannelBuilder;
+import java.util.Iterator;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
 
-/**
- * Handler for grpc server requests.
- * As data becomes available server responses will be added to the result stream.
- * Once the request is complete the client will aggregate the result metadata.
- */
-public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase {
-  @Override
-  public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) {
 
+public class GrpcQueryClient {

Review comment:
       shall we consider move this to pinot-common? So other system integration won't require pinot-core as the dependency




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5717: Add streaming query handler

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r485254555



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -190,7 +190,12 @@
     public static final String CONFIG_OF_QUERY_EXECUTOR_CLASS = "pinot.server.query.executor.class";
     public static final String CONFIG_OF_REQUEST_HANDLER_FACTORY_CLASS = "pinot.server.requestHandlerFactory.class";
     public static final String CONFIG_OF_NETTY_PORT = "pinot.server.netty.port";
+    public static final String CONFIG_OF_ENABLE_GRPC_SERVER = "pinot.server.grpc.enable";
+    public static final boolean DEFAULT_ENABLE_GRPC_SERVER = false;
+    public static final String CONFIG_OF_GRPC_PORT = "pinot.server.grpc.port";

Review comment:
       Added the port info into the Helix `InstanceConfig` under `grpcPort`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5717: Add streaming query handler

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r485227398



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcRequestBuilder.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport.grpc;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.CommonConstants.Query.Request;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+
+public class GrpcRequestBuilder {

Review comment:
       Moved to `pinot-common`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryClient.java
##########
@@ -16,20 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.starter.grpc;
+package org.apache.pinot.core.transport.grpc;
 
-import io.grpc.stub.StreamObserver;
+import io.grpc.Channel;
+import io.grpc.ManagedChannelBuilder;
+import java.util.Iterator;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
 
-/**
- * Handler for grpc server requests.
- * As data becomes available server responses will be added to the result stream.
- * Once the request is complete the client will aggregate the result metadata.
- */
-public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase {
-  @Override
-  public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) {
 
+public class GrpcQueryClient {

Review comment:
       Moved to `pinot-common`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5717: Add streaming query handler

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r457748048



##########
File path: pinot-core/pom.xml
##########
@@ -227,5 +227,25 @@
       <artifactId>lucene-analyzers-common</artifactId>
       <version>${lucene.version}</version>
     </dependency>
+    <dependency>

Review comment:
       You don't need to import them again here as they are already imported in `pinot-common`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "SelectionOnlyOperator";
+
+  private final IndexSegment _indexSegment;
+  private final TransformOperator _transformOperator;
+  private final List<ExpressionContext> _expressions;
+  private final BlockValSet[] _blockValSets;
+  private final DataSchema _dataSchema;
+  private final int _numRowsToKeep;
+  private final List<Object[]> _rows;
+  private final StreamObserver<Server.ServerResponse> _streamObserver;
+
+  private int _numDocsScanned = 0;
+
+  public StreamingSelectionOnlyOperator(
+      IndexSegment indexSegment,
+      QueryContext queryContext,
+      List<ExpressionContext> expressions,
+      TransformOperator transformOperator,
+      StreamObserver<Server.ServerResponse> streamObserver) {

Review comment:
       Let's not pass the `streamObserver` to the segment level operator because multiple segments will be processed in parallel, but calls to `streamObserver.onNext()` need to be synchronized.
   We should create a `StreamingCombineOperator` (instance level operator) to keep fetching `IntermediateResultsBlock` from this operator and handle the calls to `streamObserver.onNext()`.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants.Segment.BuiltInVirtualColumn;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.EmptySelectionOperator;
+import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+import org.apache.pinot.core.operator.query.StreamingSelectionOnlyOperator;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * The <code>SelectionPlanNode</code> class provides the execution plan for selection query on a single segment.
+ */
+public class StreamingSelectionPlanNode implements PlanNode {
+  private final IndexSegment _indexSegment;
+  private final QueryContext _queryContext;
+  private final List<ExpressionContext> _expressions;
+  private final TransformPlanNode _transformPlanNode;
+  private final StreamObserver<Server.ServerResponse> _streamObserver;
+
+  public StreamingSelectionPlanNode(IndexSegment indexSegment, QueryContext queryContext,

Review comment:
       Similarly here, don't pass the `streamObserver` to the segment-level plan node. Create a `StreamingCombinePlanNode`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -139,6 +143,52 @@ public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext quer
     }
   }
 
+  @Override
+  public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext,
+                               ExecutorService executorService,
+                               StreamObserver<Server.ServerResponse> streamObserver, long timeOutMs) {
+    List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+    for (IndexSegment indexSegment : indexSegments) {
+      planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext, streamObserver));
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, queryContext, executorService, timeOutMs, _numGroupsLimit);
+    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+  }
+
+  @Override
+  public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext,

Review comment:
       Don't pass `streamObserver` to the segment level `PlanNode`

##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
##########
@@ -18,6 +18,57 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
 public class PinotQueryService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryService.class);
+
+  private final int port;
+  private final Server server;
+
+  // TODO: pass config, and parameters to initialize handler
+  public PinotQueryService(int port) {
+    this.port = port;
+    server = ServerBuilder.forPort(port)
+        .addService(new PinotQueryHandler())
+        .build();
+
+  }
+
+  /** Start serving requests. */
+  public void start() throws IOException {
+    server.start();
+    LOGGER.info("Server started. Listening on {}", port);
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {

Review comment:
       Not sure if this shutdown hook is necessary

##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
##########
@@ -18,18 +18,258 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Handler for grpc server requests.
  * As data becomes available server responses will be added to the result stream.
  * Once the request is complete the client will aggregate the result metadata.
  */
 public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryHandler.class);
+
+  private InstanceDataManager _instanceDataManager = null;
+  private SegmentPrunerService _segmentPrunerService = null;
+  private PlanMaker _planMaker = null;
+  private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+  private ServerMetrics _serverMetrics;
+
+  public synchronized void init(PinotConfiguration config, InstanceDataManager instanceDataManager,
+                                ServerMetrics serverMetrics)
+      throws ConfigurationException {
+    _instanceDataManager = instanceDataManager;
+    _serverMetrics = serverMetrics;
+    QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config);
+    if (queryExecutorConfig.getTimeOut() > 0) {
+      _defaultTimeOutMs = queryExecutorConfig.getTimeOut();
+    }
+    LOGGER.info("Default timeout for query executor : {}", _defaultTimeOutMs);
+    LOGGER.info("Trying to build SegmentPrunerService");
+    _segmentPrunerService = new SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
+    LOGGER.info("Trying to build QueryPlanMaker");
+    _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig);
+    LOGGER.info("Trying to build QueryExecutorTimer");
+  }
+
+  /**
+   * Submit a streaming request to the server
+   * @param request
+   * @param responseObserver
+   */
   @Override
   public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) {
+    // TODO: implement, follow up whether to use ServerQueryRequest
+  }
+
+  public DataTable processQuery(ServerQueryRequest queryRequest,

Review comment:
       This method should not return anything (we can directly implement `submit()`)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "SelectionOnlyOperator";
+
+  private final IndexSegment _indexSegment;
+  private final TransformOperator _transformOperator;
+  private final List<ExpressionContext> _expressions;
+  private final BlockValSet[] _blockValSets;
+  private final DataSchema _dataSchema;
+  private final int _numRowsToKeep;
+  private final List<Object[]> _rows;
+  private final StreamObserver<Server.ServerResponse> _streamObserver;
+
+  private int _numDocsScanned = 0;
+
+  public StreamingSelectionOnlyOperator(
+      IndexSegment indexSegment,
+      QueryContext queryContext,
+      List<ExpressionContext> expressions,
+      TransformOperator transformOperator,
+      StreamObserver<Server.ServerResponse> streamObserver) {
+    _indexSegment = indexSegment;
+    _transformOperator = transformOperator;
+    _expressions = expressions;
+    _streamObserver = streamObserver;
+
+    int numExpressions = _expressions.size();
+    _blockValSets = new BlockValSet[numExpressions];
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      ExpressionContext expression = _expressions.get(i);
+      TransformResultMetadata expressionMetadata = _transformOperator.getResultMetadata(expression);
+      columnNames[i] = expression.toString();
+      columnDataTypes[i] =
+          DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
+    }
+    _dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+    _numRowsToKeep = queryContext.getLimit();
+    _rows = new ArrayList<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    TransformBlock transformBlock;
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {

Review comment:
       For the streaming operator, each time return results from one block (i.e. remove the while loop). Returns `null` when all the blocks are returned.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> {

Review comment:
       Can you please import Pinot code style in `config/codestyle-intellij.xml` or `config/codestyle-eclipse.xml` and reformat the files?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -139,6 +143,52 @@ public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext quer
     }
   }
 
+  @Override
+  public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext,
+                               ExecutorService executorService,
+                               StreamObserver<Server.ServerResponse> streamObserver, long timeOutMs) {
+    List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+    for (IndexSegment indexSegment : indexSegments) {
+      planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext, streamObserver));
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, queryContext, executorService, timeOutMs, _numGroupsLimit);
+    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+  }
+
+  @Override
+  public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext,
+                                               StreamObserver<Server.ServerResponse> streamObserver) {
+    if (QueryContextUtils.isAggregationQuery(queryContext)) {
+      // TODO: revisit, throw exception

Review comment:
       Let's throw exception for all queries other than selection only for now

##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
##########
@@ -18,18 +18,258 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Handler for grpc server requests.
  * As data becomes available server responses will be added to the result stream.
  * Once the request is complete the client will aggregate the result metadata.
  */
 public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryHandler.class);
+
+  private InstanceDataManager _instanceDataManager = null;
+  private SegmentPrunerService _segmentPrunerService = null;
+  private PlanMaker _planMaker = null;
+  private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+  private ServerMetrics _serverMetrics;
+
+  public synchronized void init(PinotConfiguration config, InstanceDataManager instanceDataManager,

Review comment:
       It will be very hard to reuse the current `QueryScheduler` because that is not designed for streaming API. So for the first version we can just launch an `ExecutorService` within this class and use it to execute queries without introducing query scheduling.

##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
##########
@@ -18,6 +18,57 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
 public class PinotQueryService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryService.class);
+
+  private final int port;

Review comment:
       Add `_` prefix for member variables

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "SelectionOnlyOperator";

Review comment:
       ```suggestion
     private static final String OPERATOR_NAME = "StreamingSelectionOnlyOperator";
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang merged pull request #5717: Add streaming query handler

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] elonazoulay commented on a change in pull request #5717: Add streaming query handler

Posted by GitBox <gi...@apache.org>.
elonazoulay commented on a change in pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r460430544



##########
File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
##########
@@ -18,18 +18,258 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Handler for grpc server requests.
  * As data becomes available server responses will be added to the result stream.
  * Once the request is complete the client will aggregate the result metadata.
  */
 public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryHandler.class);
+
+  private InstanceDataManager _instanceDataManager = null;
+  private SegmentPrunerService _segmentPrunerService = null;
+  private PlanMaker _planMaker = null;
+  private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+  private ServerMetrics _serverMetrics;
+
+  public synchronized void init(PinotConfiguration config, InstanceDataManager instanceDataManager,

Review comment:
       Makes sense, will be pushing an update for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org