You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/02/25 04:49:29 UTC
[incubator-pinot] branch master updated: Always return a response
from query execution. (#6596)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 98d569d Always return a response from query execution. (#6596)
98d569d is described below
commit 98d569db6deaab6ece09c08ece5e29a33eceab0f
Author: Amrish Lal <am...@gmail.com>
AuthorDate: Wed Feb 24 20:49:09 2021 -0800
Always return a response from query execution. (#6596)
* Always return a response from query execution.
* Cleanup.
* Handle all exceptions in chanelRead0 method.
* codereview changes.
* codereview changes.
* Rebuild.
* Add test case.
* Add test case.
* code review changes.
* codereview changes.
* code review changes.
* Cleanup comments.
* Rebuild.
---
.../apache/pinot/common/metrics/ServerMeter.java | 1 -
.../pinot/core/query/scheduler/QueryScheduler.java | 4 +
.../core/transport/InstanceRequestHandler.java | 150 +++++++++++++++------
.../tests/OfflineClusterIntegrationTest.java | 24 ++++
4 files changed, 135 insertions(+), 44 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 39cc24e..37e166d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.Utils;
public enum ServerMeter implements AbstractMetrics.Meter {
QUERIES("queries", true),
UNCAUGHT_EXCEPTIONS("exceptions", true),
- REQUEST_FETCH_EXCEPTIONS("exceptions", true),
REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true),
RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true),
SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 69c43de..6a19fbf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -306,6 +306,10 @@ public abstract class QueryScheduler {
protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
ProcessingException error) {
DataTable result = new DataTableImplV2();
+
+ Map<String, String> dataTableMetadata = result.getMetadata();
+ dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(queryRequest.getRequestId()));
+
result.addException(error);
return Futures.immediateFuture(serializeDataTable(queryRequest, result));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 3744326..bdcac02 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -25,17 +25,22 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,67 +65,126 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf>
_serverMetrics = serverMetrics;
}
+ /**
+ * Always return a response even when query execution throws exception; otherwise, broker
+ * will keep waiting until timeout.
+ */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
- long queryArrivalTimeMs = System.currentTimeMillis();
- _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
- int requestSize = msg.readableBytes();
- _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
- byte[] requestBytes = new byte[requestSize];
- msg.readBytes(requestBytes);
-
- InstanceRequest instanceRequest = new InstanceRequest();
+ long queryArrivalTimeMs = 0;
+ InstanceRequest instanceRequest = null;
+ byte[] requestBytes = null;
+
try {
+ // Put all code inside try block to catch all exceptions.
+ int requestSize = msg.readableBytes();
+
+ instanceRequest = new InstanceRequest();
+ ServerQueryRequest queryRequest;
+ requestBytes = new byte[requestSize];
+
+ queryArrivalTimeMs = System.currentTimeMillis();
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
+
+ // Parse instance request into ServerQueryRequest.
+ msg.readBytes(requestBytes);
_deserializer.deserialize(instanceRequest, requestBytes);
+ queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
+ queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
+ .stopAndRecord();
+
+ // Submit query for execution and register callback for execution results.
+ Futures.addCallback(_queryScheduler.submit(queryRequest),
+ createCallback(ctx, queryArrivalTimeMs, instanceRequest, queryRequest), MoreExecutors.directExecutor());
} catch (Exception e) {
- LOGGER
- .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
- e);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
- return;
- }
+ if (e instanceof TException) {
+ // Deserialization exception
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+ }
- ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
- queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
- .stopAndRecord();
+ // Send error response
+ String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : "";
+ long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0;
+ LOGGER.error("Exception while processing instance request: {}", hexString, e);
+ sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new DataTableImplV2(), e);
+ }
+ }
- // NOTE: executor must be provided as addCallback(future, callback) is removed from newer guava version
- Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
+ private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, long queryArrivalTimeMs,
+ InstanceRequest instanceRequest, ServerQueryRequest queryRequest) {
+ return new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] responseBytes) {
- // NOTE: response bytes can be null if data table serialization throws exception
if (responseBytes != null) {
- long sendResponseStartTimeMs = System.currentTimeMillis();
- int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
- ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
- long sendResponseEndTimeMs = System.currentTimeMillis();
- int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
- _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
- TimeUnit.MILLISECONDS);
-
- int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
- if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
- LOGGER.info(
- "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
- queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
- }
- });
+ // responseBytes contains either query results or exception.
+ sendResponse(ctx, queryArrivalTimeMs, responseBytes);
+ } else {
+ // Send exception response.
+ sendErrorResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
+ new Exception("Null query response."));
}
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Caught exception while processing instance request", t);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+ // Send exception response.
+ LOGGER.error("Exception while processing instance request", t);
+ sendErrorResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
+ new Exception(t));
}
- }, MoreExecutors.directExecutor());
+ };
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- LOGGER.error("Caught exception while fetching instance request", cause);
- _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
+ // All exceptions should be caught and handled in channelRead0 method. This is a fallback method that
+ // will only be called if for some remote reason we are unable to handle exceptions in channelRead0.
+ String message = "Unhandled Exception in " + getClass().getCanonicalName();
+ LOGGER.error(message, cause);
+ sendErrorResponse(ctx, 0, System.currentTimeMillis(), new DataTableImplV2(), new Exception(message, cause));
+ }
+
+ /**
+ * Send an exception back to broker as response to the query request.
+ */
+ private void sendErrorResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs,
+ DataTable dataTable, Exception e) {
+ try {
+ Map<String, String> dataTableMetadata = dataTable.getMetadata();
+ dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(requestId));
+ dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+ byte[] serializedDataTable = dataTable.toBytes();
+ sendResponse(ctx, queryArrivalTimeMs, serializedDataTable);
+ } catch (Exception exception) {
+ LOGGER.error("Exception while sending query processing error to Broker.", exception);
+ } finally {
+ // Log query processing exception
+ LOGGER.error("Query processing error: ", e);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
+ }
+ }
+
+ /**
+ * Send a response (either query results or exception) back to broker as response to the query request.
+ */
+ private void sendResponse(ChannelHandlerContext ctx, long queryArrivalTimeMs, byte[] serializedDataTable) {
+ long sendResponseStartTimeMs = System.currentTimeMillis();
+ int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
+ ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f -> {
+ long sendResponseEndTimeMs = System.currentTimeMillis();
+ int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, serializedDataTable.length);
+ _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
+ TimeUnit.MILLISECONDS);
+
+ int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
+ if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+ LOGGER.info(
+ "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
+ queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
+ }
+ });
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index b1bb207..00876d4 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import javax.validation.constraints.AssertTrue;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.exception.QueryException;
@@ -438,6 +439,29 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}, 600_000L, "Failed to generate bloom filter");
}
+ /** Check if server returns error response quickly without timing out Broker. */
+ @Test
+ public void testServerErrorWithBrokerTimeout()
+ throws Exception {
+ // Set query timeout
+ long queryTimeout = 5000;
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.setQueryConfig(new QueryConfig(queryTimeout));
+ updateTableConfig(tableConfig);
+
+ long startTime = System.currentTimeMillis();
+ // The query below will fail execution due to use of double quotes around value in IN clause.
+ JsonNode queryResponse = postSqlQuery("SELECT count(*) FROM mytable WHERE Dest IN (\"DFW\")");
+ String result = queryResponse.toPrettyString();
+
+ assertTrue(System.currentTimeMillis() - startTime < queryTimeout);
+ assertTrue(queryResponse.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
+
+ // Remove timeout
+ tableConfig.setQueryConfig(null);
+ updateTableConfig(tableConfig);
+ }
+
@Test
public void testStarTreeTriggering()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org