You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/02/21 18:36:29 UTC

[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #6596: Always return a response from query execution.

mayankshriv commented on a change in pull request #6596:
URL: https://github.com/apache/incubator-pinot/pull/6596#discussion_r579845110



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
##########
@@ -306,6 +306,10 @@ private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
   protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
       ProcessingException error) {
     DataTable result = new DataTableImplV2();
+
+    Map<String, String> dataTableMetadata = result.getMetadata();

Review comment:
       Can we consider making this DataTable a static one that is reused across all failing queries, to avoid creating new objects? Or is there a case where that doesn't work?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,107 @@ public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serve
     _serverMetrics = serverMetrics;
   }
 
+  /**
+   * Always return a response even when query execution throws exception; otherwise, broker
+   * will keep waiting until timeout.
+   */
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-    long queryArrivalTimeMs = System.currentTimeMillis();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
-    int requestSize = msg.readableBytes();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
-    byte[] requestBytes = new byte[requestSize];
-    msg.readBytes(requestBytes);
+    final long queryArrivalTimeMs = System.currentTimeMillis();
+    final int requestSize = msg.readableBytes();
 
     InstanceRequest instanceRequest = new InstanceRequest();
+    ServerQueryRequest queryRequest;
+    byte[] requestBytes = new byte[requestSize];

Review comment:
       Why not put all non-declaration statements inside the try block, in the rare event an of these fail, the code will be able to handle it.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,107 @@ public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serve
     _serverMetrics = serverMetrics;
   }
 
+  /**
+   * Always return a response even when query execution throws exception; otherwise, broker
+   * will keep waiting until timeout.
+   */
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-    long queryArrivalTimeMs = System.currentTimeMillis();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
-    int requestSize = msg.readableBytes();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
-    byte[] requestBytes = new byte[requestSize];
-    msg.readBytes(requestBytes);
+    final long queryArrivalTimeMs = System.currentTimeMillis();
+    final int requestSize = msg.readableBytes();
 
     InstanceRequest instanceRequest = new InstanceRequest();
+    ServerQueryRequest queryRequest;
+    byte[] requestBytes = new byte[requestSize];
+
     try {
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
+
+      // parse instance request into a query result.
+      msg.readBytes(requestBytes);
       _deserializer.deserialize(instanceRequest, requestBytes);
+      queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
+      queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
+          .stopAndRecord();
     } catch (Exception e) {
-      LOGGER
-          .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
-              e);
-      _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+      LOGGER.error("Exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes), e);
+      sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), e);
       return;
     }
 
-    ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
-    queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
-        .stopAndRecord();
-
-    // NOTE: executor must be provided as addCallback(future, callback) is removed from newer guava version
+    // Submit query for execution and register callback for execution results.
     Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
       @Override
       public void onSuccess(@Nullable byte[] responseBytes) {
-        // NOTE: response bytes can be null if data table serialization throws exception
         if (responseBytes != null) {
-          long sendResponseStartTimeMs = System.currentTimeMillis();
-          int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
-          ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
-            long sendResponseEndTimeMs = System.currentTimeMillis();
-            int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
-            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
-            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
-            _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
-                TimeUnit.MILLISECONDS);
-
-            int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
-            if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
-              LOGGER.info(
-                  "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
-                  queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
-            }
-          });
+          // responseBytes contains either query results or exception.
+          sendResponse(ctx, queryArrivalTimeMs, responseBytes);
+        } else {
+          // Send exception response.
+          sendResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
+              new Exception("Null query response."));
         }
       }
 
       @Override
       public void onFailure(Throwable t) {
-        LOGGER.error("Caught exception while processing instance request", t);
-        _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+        // Send exception response.
+        LOGGER.error("Exception while processing instance request", t);
+        sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), new Exception(t));
       }
     }, MoreExecutors.directExecutor());
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOGGER.error("Caught exception while fetching instance request", cause);
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
+    // Since we do not know the requestId of the original request here, there is no way for Broker to know which query
+    // request this response belongs to. Hence, Broker will continue to wait for the original request until time out.
+    // To prevent broker from waiting unncessarily,try to catch and handle all exceptions in channelRead0 method so that
+    // this function is never called.
+    LOGGER.error("Unhandled Exception in " + getClass().getCanonicalName(), cause);
+    sendResponse(ctx, 0, System.currentTimeMillis(), new DataTableImplV2(), new Exception(cause));
+  }
+
+  /**
+   * Send an exception back to broker as response to the query request.
+   */
+  private void sendResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs, DataTable dataTable,
+      Exception e) {
+    try {
+      Map<String, String> dataTableMetadata = dataTable.getMetadata();
+      dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(requestId));
+
+      dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+      byte[] serializedDataTable = dataTable.toBytes();
+      sendResponse(ctx, queryArrivalTimeMs, serializedDataTable);
+
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
+    } catch (Throwable t) {
+      // Ignore since we are already handling a higher level exceptions.

Review comment:
       But then we won't be sending a response back (ie line 140 may not execute)?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,107 @@ public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serve
     _serverMetrics = serverMetrics;
   }
 
+  /**
+   * Always return a response even when query execution throws exception; otherwise, broker
+   * will keep waiting until timeout.
+   */
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-    long queryArrivalTimeMs = System.currentTimeMillis();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
-    int requestSize = msg.readableBytes();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
-    byte[] requestBytes = new byte[requestSize];
-    msg.readBytes(requestBytes);
+    final long queryArrivalTimeMs = System.currentTimeMillis();
+    final int requestSize = msg.readableBytes();
 
     InstanceRequest instanceRequest = new InstanceRequest();
+    ServerQueryRequest queryRequest;
+    byte[] requestBytes = new byte[requestSize];
+
     try {
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
+
+      // parse instance request into a query result.
+      msg.readBytes(requestBytes);
       _deserializer.deserialize(instanceRequest, requestBytes);
+      queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
+      queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
+          .stopAndRecord();
     } catch (Exception e) {
-      LOGGER
-          .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
-              e);
-      _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+      LOGGER.error("Exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes), e);
+      sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), e);
       return;
     }
 
-    ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
-    queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
-        .stopAndRecord();
-
-    // NOTE: executor must be provided as addCallback(future, callback) is removed from newer guava version
+    // Submit query for execution and register callback for execution results.
     Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
       @Override
       public void onSuccess(@Nullable byte[] responseBytes) {
-        // NOTE: response bytes can be null if data table serialization throws exception
         if (responseBytes != null) {
-          long sendResponseStartTimeMs = System.currentTimeMillis();
-          int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
-          ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
-            long sendResponseEndTimeMs = System.currentTimeMillis();
-            int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
-            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
-            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
-            _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
-                TimeUnit.MILLISECONDS);
-
-            int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
-            if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
-              LOGGER.info(
-                  "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
-                  queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
-            }
-          });
+          // responseBytes contains either query results or exception.
+          sendResponse(ctx, queryArrivalTimeMs, responseBytes);
+        } else {
+          // Send exception response.
+          sendResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
+              new Exception("Null query response."));
         }
       }
 
       @Override
       public void onFailure(Throwable t) {
-        LOGGER.error("Caught exception while processing instance request", t);
-        _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+        // Send exception response.
+        LOGGER.error("Exception while processing instance request", t);
+        sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), new Exception(t));
       }
     }, MoreExecutors.directExecutor());
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOGGER.error("Caught exception while fetching instance request", cause);
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
+    // Since we do not know the requestId of the original request here, there is no way for Broker to know which query
+    // request this response belongs to. Hence, Broker will continue to wait for the original request until time out.
+    // To prevent broker from waiting unncessarily,try to catch and handle all exceptions in channelRead0 method so that
+    // this function is never called.
+    LOGGER.error("Unhandled Exception in " + getClass().getCanonicalName(), cause);
+    sendResponse(ctx, 0, System.currentTimeMillis(), new DataTableImplV2(), new Exception(cause));
+  }
+
+  /**
+   * Send an exception back to broker as response to the query request.
+   */
+  private void sendResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs, DataTable dataTable,

Review comment:
       Not sure if I understand why we need to separate signatures for this? Can the caller not prepare the DaaTable as necessary for the response, (eg set the exception in the data table if there's one)? And this method is just responsible for sending back the response?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
##########
@@ -60,67 +64,107 @@ public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serve
     _serverMetrics = serverMetrics;
   }
 
+  /**
+   * Always return a response even when query execution throws exception; otherwise, broker
+   * will keep waiting until timeout.
+   */
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-    long queryArrivalTimeMs = System.currentTimeMillis();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
-    int requestSize = msg.readableBytes();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
-    byte[] requestBytes = new byte[requestSize];
-    msg.readBytes(requestBytes);
+    final long queryArrivalTimeMs = System.currentTimeMillis();
+    final int requestSize = msg.readableBytes();
 
     InstanceRequest instanceRequest = new InstanceRequest();
+    ServerQueryRequest queryRequest;
+    byte[] requestBytes = new byte[requestSize];
+
     try {
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
+
+      // parse instance request into a query result.
+      msg.readBytes(requestBytes);
       _deserializer.deserialize(instanceRequest, requestBytes);
+      queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
+      queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
+          .stopAndRecord();
     } catch (Exception e) {
-      LOGGER
-          .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
-              e);
-      _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+      LOGGER.error("Exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes), e);
+      sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), e);
       return;
     }
 
-    ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
-    queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
-        .stopAndRecord();
-
-    // NOTE: executor must be provided as addCallback(future, callback) is removed from newer guava version
+    // Submit query for execution and register callback for execution results.
     Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
       @Override
       public void onSuccess(@Nullable byte[] responseBytes) {
-        // NOTE: response bytes can be null if data table serialization throws exception
         if (responseBytes != null) {
-          long sendResponseStartTimeMs = System.currentTimeMillis();
-          int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
-          ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
-            long sendResponseEndTimeMs = System.currentTimeMillis();
-            int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
-            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
-            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
-            _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
-                TimeUnit.MILLISECONDS);
-
-            int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
-            if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
-              LOGGER.info(
-                  "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
-                  queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
-            }
-          });
+          // responseBytes contains either query results or exception.
+          sendResponse(ctx, queryArrivalTimeMs, responseBytes);
+        } else {
+          // Send exception response.
+          sendResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
+              new Exception("Null query response."));
         }
       }
 
       @Override
       public void onFailure(Throwable t) {
-        LOGGER.error("Caught exception while processing instance request", t);
-        _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+        // Send exception response.
+        LOGGER.error("Exception while processing instance request", t);
+        sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), new Exception(t));
       }
     }, MoreExecutors.directExecutor());
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOGGER.error("Caught exception while fetching instance request", cause);
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
+    // Since we do not know the requestId of the original request here, there is no way for Broker to know which query

Review comment:
       Can we use ChannelHandlerContext for storing that, or is that a read-only object?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org