You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/02/25 04:49:29 UTC

[incubator-pinot] branch master updated: Always return a response from query execution. (#6596)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 98d569d  Always return a response from query execution. (#6596)
98d569d is described below

commit 98d569db6deaab6ece09c08ece5e29a33eceab0f
Author: Amrish Lal <am...@gmail.com>
AuthorDate: Wed Feb 24 20:49:09 2021 -0800

    Always return a response from query execution. (#6596)
    
    * Always return a response from query execution.
    
    * Cleanup.
    
    * Handle all exceptions in chanelRead0 method.
    
    * codereview changes.
    
    * codereview changes.
    
    * Rebuild.
    
    * Add test case.
    
    * Add test case.
    
    * code review changes.
    
    * codereview changes.
    
    * code review changes.
    
    * Cleanup comments.
    
    * Rebuild.
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 -
 .../pinot/core/query/scheduler/QueryScheduler.java |   4 +
 .../core/transport/InstanceRequestHandler.java     | 150 +++++++++++++++------
 .../tests/OfflineClusterIntegrationTest.java       |  24 ++++
 4 files changed, 135 insertions(+), 44 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 39cc24e..37e166d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.Utils;
 public enum ServerMeter implements AbstractMetrics.Meter {
   QUERIES("queries", true),
   UNCAUGHT_EXCEPTIONS("exceptions", true),
-  REQUEST_FETCH_EXCEPTIONS("exceptions", true),
   REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true),
   RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true),
   SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 69c43de..6a19fbf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -306,6 +306,10 @@ public abstract class QueryScheduler {
   protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
       ProcessingException error) {
     DataTable result = new DataTableImplV2();
+
+    Map<String, String> dataTableMetadata = result.getMetadata();
+    dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(queryRequest.getRequestId()));
+
     result.addException(error);
     return Futures.immediateFuture(serializeDataTable(queryRequest, result));
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 3744326..bdcac02 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -25,17 +25,22 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,67 +65,126 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf>
     _serverMetrics = serverMetrics;
   }
 
+  /**
+   * Always return a response even when query execution throws exception; otherwise, broker
+   * will keep waiting until timeout.
+   */
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-    long queryArrivalTimeMs = System.currentTimeMillis();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
-    int requestSize = msg.readableBytes();
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
-    byte[] requestBytes = new byte[requestSize];
-    msg.readBytes(requestBytes);
-
-    InstanceRequest instanceRequest = new InstanceRequest();
+    long queryArrivalTimeMs = 0;
+    InstanceRequest instanceRequest = null;
+    byte[] requestBytes = null;
+
     try {
+      // Put all code inside try block to catch all exceptions.
+      int requestSize = msg.readableBytes();
+
+      instanceRequest = new InstanceRequest();
+      ServerQueryRequest queryRequest;
+      requestBytes = new byte[requestSize];
+
+      queryArrivalTimeMs = System.currentTimeMillis();
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
+
+      // Parse instance request into ServerQueryRequest.
+      msg.readBytes(requestBytes);
       _deserializer.deserialize(instanceRequest, requestBytes);
+      queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
+      queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
+          .stopAndRecord();
+
+      // Submit query for execution and register callback for execution results.
+      Futures.addCallback(_queryScheduler.submit(queryRequest),
+          createCallback(ctx, queryArrivalTimeMs, instanceRequest, queryRequest), MoreExecutors.directExecutor());
     } catch (Exception e) {
-      LOGGER
-          .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
-              e);
-      _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
-      return;
-    }
+      if (e instanceof TException) {
+        // Deserialization exception
+        _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+      }
 
-    ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
-    queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
-        .stopAndRecord();
+      // Send error response
+      String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : "";
+      long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0;
+      LOGGER.error("Exception while processing instance request: {}", hexString, e);
+      sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new DataTableImplV2(), e);
+    }
+  }
 
-    // NOTE: executor must be provided as addCallback(future, callback) is removed from newer guava version
-    Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
+  private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, long queryArrivalTimeMs,
+      InstanceRequest instanceRequest, ServerQueryRequest queryRequest) {
+    return new FutureCallback<byte[]>() {
       @Override
       public void onSuccess(@Nullable byte[] responseBytes) {
-        // NOTE: response bytes can be null if data table serialization throws exception
         if (responseBytes != null) {
-          long sendResponseStartTimeMs = System.currentTimeMillis();
-          int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
-          ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
-            long sendResponseEndTimeMs = System.currentTimeMillis();
-            int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
-            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
-            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
-            _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
-                TimeUnit.MILLISECONDS);
-
-            int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
-            if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
-              LOGGER.info(
-                  "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
-                  queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
-            }
-          });
+          // responseBytes contains either query results or exception.
+          sendResponse(ctx, queryArrivalTimeMs, responseBytes);
+        } else {
+          // Send exception response.
+          sendErrorResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
+              new Exception("Null query response."));
         }
       }
 
       @Override
       public void onFailure(Throwable t) {
-        LOGGER.error("Caught exception while processing instance request", t);
-        _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+        // Send exception response.
+        LOGGER.error("Exception while processing instance request", t);
+        sendErrorResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
+            new Exception(t));
       }
-    }, MoreExecutors.directExecutor());
+    };
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOGGER.error("Caught exception while fetching instance request", cause);
-    _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
+    // All exceptions should be caught and handled in channelRead0 method. This is a fallback method that
+    // will only be called if for some remote reason we are unable to handle exceptions in channelRead0.
+    String message = "Unhandled Exception in " + getClass().getCanonicalName();
+    LOGGER.error(message, cause);
+    sendErrorResponse(ctx, 0, System.currentTimeMillis(), new DataTableImplV2(), new Exception(message, cause));
+  }
+
+  /**
+   * Send an exception back to broker as response to the query request.
+   */
+  private void sendErrorResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs,
+      DataTable dataTable, Exception e) {
+    try {
+      Map<String, String> dataTableMetadata = dataTable.getMetadata();
+      dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(requestId));
+      dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+      byte[] serializedDataTable = dataTable.toBytes();
+      sendResponse(ctx, queryArrivalTimeMs, serializedDataTable);
+    } catch (Exception exception) {
+      LOGGER.error("Exception while sending query processing error to Broker.", exception);
+    } finally {
+      // Log query processing exception
+      LOGGER.error("Query processing error: ", e);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
+    }
+  }
+
+  /**
+   * Send a response (either query results or exception) back to broker as response to the query request.
+   */
+  private void sendResponse(ChannelHandlerContext ctx, long queryArrivalTimeMs, byte[] serializedDataTable) {
+    long sendResponseStartTimeMs = System.currentTimeMillis();
+    int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
+    ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f -> {
+      long sendResponseEndTimeMs = System.currentTimeMillis();
+      int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, serializedDataTable.length);
+      _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
+          TimeUnit.MILLISECONDS);
+
+      int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
+      if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+        LOGGER.info(
+            "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
+            queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
+      }
+    });
   }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index b1bb207..00876d4 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import javax.validation.constraints.AssertTrue;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.exception.QueryException;
@@ -438,6 +439,29 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     }, 600_000L, "Failed to generate bloom filter");
   }
 
+  /** Check if server returns error response quickly without timing out Broker. */
+  @Test
+  public void testServerErrorWithBrokerTimeout()
+    throws Exception {
+      // Set query timeout
+      long queryTimeout = 5000;
+      TableConfig tableConfig = getOfflineTableConfig();
+      tableConfig.setQueryConfig(new QueryConfig(queryTimeout));
+      updateTableConfig(tableConfig);
+
+      long startTime = System.currentTimeMillis();
+      // The query below will fail execution due to use of double quotes around value in IN clause.
+      JsonNode queryResponse = postSqlQuery("SELECT count(*) FROM mytable WHERE Dest IN (\"DFW\")");
+      String result = queryResponse.toPrettyString();
+
+      assertTrue(System.currentTimeMillis() - startTime < queryTimeout);
+      assertTrue(queryResponse.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
+
+      // Remove timeout
+      tableConfig.setQueryConfig(null);
+      updateTableConfig(tableConfig);
+  }
+
   @Test
   public void testStarTreeTriggering()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org