You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ro...@apache.org on 2022/09/07 08:24:52 UTC
[druid] branch master updated: Add query/time metric for SQL queries from router (#12867)
This is an automated email from the ASF dual-hosted git repository.
rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7aa8d7f987 Add query/time metric for SQL queries from router (#12867)
7aa8d7f987 is described below
commit 7aa8d7f9875cbe950696b5c94e375ac616dd2d04
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Wed Sep 7 13:54:46 2022 +0530
Add query/time metric for SQL queries from router (#12867)
* Add query/time metric for SQL queries from router
* Fix query cancel bug when user has overriden native query-id in a SQL query
---
docs/operations/metrics.md | 5 +
.../query/DefaultGenericQueryMetricsFactory.java | 5 +
.../apache/druid/query/DefaultQueryMetrics.java | 12 ++
.../druid/query/GenericQueryMetricsFactory.java | 7 +
.../java/org/apache/druid/query/QueryMetrics.java | 12 ++
.../query/search/DefaultSearchQueryMetrics.java | 12 ++
.../druid/query/DefaultQueryMetricsTest.java | 6 +-
.../search/DefaultSearchQueryMetricsTest.java | 3 +
.../org/apache/druid/server/QueryResource.java | 3 +-
.../druid/server/AsyncQueryForwardingServlet.java | 210 ++++++++++++++++++---
.../server/AsyncQueryForwardingServletTest.java | 102 ++++++++--
.../druid/sql/calcite/run/NativeQueryMaker.java | 2 +
.../java/org/apache/druid/sql/http/SqlQuery.java | 13 ++
13 files changed, 348 insertions(+), 44 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8bc302e598..6cc013664e 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -39,6 +39,11 @@ Metrics may have additional dimensions beyond those listed above.
## Query metrics
+### Router
+|Metric|Description|Dimensions|Normal Value|
+|------|-----------|----------|------------|
+|`query/time`|Milliseconds taken to complete a query.|Native Query: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id.|< 1s|
+
### Broker
|Metric|Description|Dimensions|Normal Value|
diff --git a/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java b/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java
index e0c5b6017b..ec2662bad4 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java
@@ -46,4 +46,9 @@ public class DefaultGenericQueryMetricsFactory implements GenericQueryMetricsFac
return queryMetrics;
}
+ @Override
+ public QueryMetrics<Query<?>> makeMetrics()
+ {
+ return new DefaultQueryMetrics<>();
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
index ac20cc248d..441b36d5bb 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
@@ -132,6 +132,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(query.getId()));
}
+ @Override
+ public void queryId(String queryId)
+ {
+ setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(queryId));
+ }
+
@Override
public void subQueryId(QueryType query)
{
@@ -144,6 +150,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
// Emit nothing by default.
}
+ @Override
+ public void sqlQueryId(String sqlQueryId)
+ {
+ // Emit nothing by default.
+ }
+
@Override
public void context(QueryType query)
{
diff --git a/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java b/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java
index ce9c4e19c4..91e1e29ce0 100644
--- a/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java
@@ -46,4 +46,11 @@ public interface GenericQueryMetricsFactory
* call {@link QueryMetrics#query(Query)} with the given query on the created QueryMetrics object before returning.
*/
QueryMetrics<Query<?>> makeMetrics(Query<?> query);
+
+ /**
+ * Creates a {@link QueryMetrics} which doesn't have predefined QueryMetrics subclass. This method is used
+ * by the router to build a {@link QueryMetrics} for SQL queries. It is needed since at router, there is no native
+ * query linked to a SQL query.
+ */
+ QueryMetrics<Query<?>> makeMetrics();
}
diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
index 4d2406c697..f4d71060bf 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
@@ -202,6 +202,12 @@ public interface QueryMetrics<QueryType extends Query<?>>
@PublicApi
void queryId(QueryType query);
+ /**
+ * Sets id of the given query as dimension.
+ */
+ @PublicApi
+ void queryId(@SuppressWarnings("UnusedParameters") String queryId);
+
/**
* Sets {@link Query#getSubQueryId()} of the given query as dimension.
*/
@@ -214,6 +220,12 @@ public interface QueryMetrics<QueryType extends Query<?>>
@PublicApi
void sqlQueryId(QueryType query);
+ /**
+ * Sets sqlQueryId as a dimension
+ */
+ @PublicApi
+ void sqlQueryId(@SuppressWarnings("UnusedParameters") String sqlQueryId);
+
/**
* Sets {@link Query#getContext()} of the given query as dimension.
*/
diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index 108518a794..2481eeb35f 100644
--- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -87,6 +87,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
throw new ISE("Unsupported method in default query metrics implementation.");
}
+ @Override
+ public void queryId(String queryId)
+ {
+ throw new ISE("Unsupported method in default query metrics implementation.");
+ }
+
@Override
public void subQueryId(SearchQuery query)
{
@@ -99,6 +105,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
throw new ISE("Unsupported method in default query metrics implementation.");
}
+ @Override
+ public void sqlQueryId(String sqlQueryId)
+ {
+ throw new ISE("Unsupported method in default query metrics implementation.");
+ }
+
@Override
public void granularity(SearchQuery query)
{
diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index 7658ffd841..d512a294ef 100644
--- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -67,6 +67,10 @@ public class DefaultQueryMetricsTest
.build();
queryMetrics.query(query);
queryMetrics.reportQueryTime(0).emit(serviceEmitter);
+ // No way to verify this right now since DefaultQueryMetrics implements a no-op for sqlQueryId(String) and queryId(String)
+ // This change is done to keep the code coverage tool happy by exercising the implementation
+ queryMetrics.sqlQueryId("dummy");
+ queryMetrics.queryId("dummy");
Map<String, Object> actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals(13, actualEvent.size());
Assert.assertTrue(actualEvent.containsKey("feed"));
@@ -81,7 +85,7 @@ public class DefaultQueryMetricsTest
Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL));
Assert.assertEquals("true", actualEvent.get("hasFilters"));
Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration"));
- Assert.assertEquals("", actualEvent.get(DruidMetrics.ID));
+ Assert.assertEquals("dummy", actualEvent.get(DruidMetrics.ID));
Assert.assertEquals("query/time", actualEvent.get("metric"));
Assert.assertEquals(0L, actualEvent.get("value"));
Assert.assertEquals(ImmutableMap.of("testKey", "testValue"), actualEvent.get("context"));
diff --git a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
index ae666e3902..a03781566b 100644
--- a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.search;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.CachingEmitter;
import org.apache.druid.query.DefaultQueryMetricsTest;
@@ -86,6 +87,8 @@ public class DefaultSearchQueryMetricsTest
// Metric
Assert.assertEquals("query/time", actualEvent.get("metric"));
Assert.assertEquals(0L, actualEvent.get("value"));
+
+ Assert.assertThrows(ISE.class, () -> queryMetrics.sqlQueryId("dummy"));
}
@Test
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index f2a55242ea..590347735b 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -98,6 +98,7 @@ public class QueryResource implements QueryCountStatsProvider
*/
public static final String HEADER_RESPONSE_CONTEXT = "X-Druid-Response-Context";
public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
+ public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id";
public static final String HEADER_ETAG = "ETag";
protected final QueryLifecycleFactory queryLifecycleFactory;
@@ -252,7 +253,7 @@ public class QueryResource implements QueryCountStatsProvider
},
ioReaderWriter.getResponseWriter().getResponseType()
)
- .header("X-Druid-Query-Id", queryId);
+ .header(QUERY_ID_RESPONSE_HEADER, queryId);
transferEntityTag(responseContext, responseBuilder);
diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index 3668543bf4..fa3b52669b 100644
--- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query;
@@ -56,6 +57,7 @@ import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authenticator;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlResource;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@@ -65,12 +67,14 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
+import javax.annotation.Nullable;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -266,11 +270,16 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
handleException(response, objectMapper, e);
return;
}
- } else if (routeSqlByStrategy && isSqlQueryEndpoint && HttpMethod.POST.is(method)) {
+ } else if (isSqlQueryEndpoint && HttpMethod.POST.is(method)) {
try {
SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class);
+ inputSqlQuery = buildSqlQueryWithId(inputSqlQuery);
request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
- targetServer = hostFinder.findServerSql(inputSqlQuery);
+ if (routeSqlByStrategy) {
+ targetServer = hostFinder.findServerSql(inputSqlQuery);
+ } else {
+ targetServer = hostFinder.pickDefaultServer();
+ }
LOG.debug("Forwarding SQL query to broker [%s]", targetServer.getHost());
}
catch (IOException e) {
@@ -292,6 +301,22 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
doService(request, response);
}
+ /**
+ * Rebuilds the {@link SqlQuery} object with sqlQueryId and queryId context parameters if not present
+ * @param sqlQuery the original SqlQuery
+ * @return an updated sqlQuery object with sqlQueryId and queryId context parameters
+ */
+ private SqlQuery buildSqlQueryWithId(SqlQuery sqlQuery)
+ {
+ Map<String, Object> context = new HashMap<>(sqlQuery.getContext());
+ String sqlQueryId = (String) context.getOrDefault(BaseQuery.SQL_QUERY_ID, UUID.randomUUID().toString());
+ // set queryId to sqlQueryId if not overridden
+ String queryId = (String) context.getOrDefault(BaseQuery.QUERY_ID, sqlQueryId);
+ context.put(BaseQuery.SQL_QUERY_ID, sqlQueryId);
+ context.put(BaseQuery.QUERY_ID, queryId);
+ return sqlQuery.withOverridenContext(context);
+ }
+
/**
* Issues async query cancellation requests to all Brokers (except the given
* targetServer). Query cancellation on the targetServer is handled by the
@@ -449,12 +474,15 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
@Override
protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response)
{
- final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE);
- if (query != null) {
- return newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime());
- } else {
- return super.newProxyResponseListener(request, response);
- }
+ boolean isJDBC = request.getAttribute(AVATICA_QUERY_ATTRIBUTE) != null;
+ return newMetricsEmittingProxyResponseListener(
+ request,
+ response,
+ (Query) request.getAttribute(QUERY_ATTRIBUTE),
+ (SqlQuery) request.getAttribute(SQL_QUERY_ATTRIBUTE),
+ isJDBC,
+ System.nanoTime()
+ );
}
@Override
@@ -500,11 +528,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
private Response.Listener newMetricsEmittingProxyResponseListener(
HttpServletRequest request,
HttpServletResponse response,
- Query query,
+ @Nullable Query query,
+ @Nullable SqlQuery sqlQuery,
+ boolean isJDBC,
long startNs
)
{
- return new MetricsEmittingProxyResponseListener(request, response, query, startNs);
+ return new MetricsEmittingProxyResponseListener(request, response, query, sqlQuery, isJDBC, startNs);
}
@Override
@@ -660,22 +690,28 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
private class MetricsEmittingProxyResponseListener<T> extends ProxyResponseListener
{
private final HttpServletRequest req;
- private final HttpServletResponse res;
+ @Nullable
private final Query<T> query;
+ @Nullable
+ private final SqlQuery sqlQuery;
+ private final boolean isJDBC;
private final long startNs;
public MetricsEmittingProxyResponseListener(
HttpServletRequest request,
HttpServletResponse response,
- Query<T> query,
+ @Nullable Query<T> query,
+ @Nullable SqlQuery sqlQuery,
+ boolean isJDBC,
long startNs
)
{
super(request, response);
this.req = request;
- this.res = response;
this.query = query;
+ this.sqlQuery = sqlQuery;
+ this.isJDBC = isJDBC;
this.startNs = startNs;
}
@@ -683,14 +719,63 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
public void onComplete(Result result)
{
final long requestTimeNs = System.nanoTime() - startNs;
- try {
- boolean success = result.isSucceeded();
- if (success) {
- successfulQueryCount.incrementAndGet();
- } else {
- failedQueryCount.incrementAndGet();
+ String queryId = null;
+ String sqlQueryId = null;
+ if (isJDBC) {
+ sqlQueryId = result.getResponse().getHeaders().get(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER);
+ } else if (sqlQuery != null) {
+ sqlQueryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.SQL_QUERY_ID, null);
+ queryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.QUERY_ID, null);
+ } else if (query != null) {
+ queryId = query.getId();
+ }
+
+ // not a native or SQL query, no need to emit metrics and logs
+ if (queryId == null && sqlQueryId == null) {
+ super.onComplete(result);
+ return;
+ }
+
+ boolean success = result.isSucceeded();
+ if (success) {
+ successfulQueryCount.incrementAndGet();
+ } else {
+ failedQueryCount.incrementAndGet();
+ }
+ emitQueryTime(requestTimeNs, success, sqlQueryId, queryId);
+
+ //noinspection VariableNotUsedInsideIf
+ if (sqlQueryId != null) {
+ // SQL query doesn't have a native query translation in router. Hence, not logging the native query.
+ if (sqlQuery != null) {
+ try {
+ requestLogger.logSqlQuery(
+ RequestLogLine.forSql(
+ sqlQuery.getQuery(),
+ sqlQuery.getContext(),
+ DateTimes.nowUtc(),
+ req.getRemoteAddr(),
+ new QueryStats(
+ ImmutableMap.of(
+ "query/time",
+ TimeUnit.NANOSECONDS.toMillis(requestTimeNs),
+ "success",
+ success
+ && result.getResponse().getStatus() == Status.OK.getStatusCode()
+ )
+ )
+ )
+ );
+ }
+ catch (IOException e) {
+ LOG.error(e, "Unable to log SQL query [%s]!", sqlQuery);
+ }
}
- emitQueryTime(requestTimeNs, success);
+ super.onComplete(result);
+ return;
+ }
+
+ try {
requestLogger.logNativeQuery(
RequestLogLine.forNative(
query,
@@ -718,10 +803,64 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
@Override
public void onFailure(Response response, Throwable failure)
{
+ final long requestTimeNs = System.nanoTime() - startNs;
+ final String errorMessage = failure.getMessage();
+ String queryId = null;
+ String sqlQueryId = null;
+ if (isJDBC) {
+ sqlQueryId = response.getHeaders().get(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER);
+ } else if (sqlQuery != null) {
+ sqlQueryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.SQL_QUERY_ID, null);
+ queryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.QUERY_ID, null);
+ } else if (query != null) {
+ queryId = query.getId();
+ }
+
+ // not a native or SQL query, no need to emit metrics and logs
+ if (queryId == null && sqlQueryId == null) {
+ super.onFailure(response, failure);
+ return;
+ }
+
+ failedQueryCount.incrementAndGet();
+ emitQueryTime(requestTimeNs, false, sqlQueryId, queryId);
+
+ //noinspection VariableNotUsedInsideIf
+ if (sqlQueryId != null) {
+ // SQL query doesn't have a native query translation in router. Hence, not logging the native query.
+ if (sqlQuery != null) {
+ try {
+ requestLogger.logSqlQuery(
+ RequestLogLine.forSql(
+ sqlQuery.getQuery(),
+ sqlQuery.getContext(),
+ DateTimes.nowUtc(),
+ req.getRemoteAddr(),
+ new QueryStats(
+ ImmutableMap.of(
+ "success",
+ false,
+ "exception",
+ errorMessage == null ? "no message" : errorMessage
+ )
+ )
+ )
+ );
+ }
+ catch (IOException e) {
+ LOG.error(e, "Unable to log SQL query [%s]!", sqlQuery);
+ }
+ LOG.makeAlert(failure, "Exception handling request")
+ .addData("exception", failure.toString())
+ .addData("sqlQuery", sqlQuery)
+ .addData("peer", req.getRemoteAddr())
+ .emit();
+ }
+ super.onFailure(response, failure);
+ return;
+ }
+
try {
- final String errorMessage = failure.getMessage();
- failedQueryCount.incrementAndGet();
- emitQueryTime(System.nanoTime() - startNs, false);
requestLogger.logNativeQuery(
RequestLogLine.forNative(
query,
@@ -751,14 +890,25 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
super.onFailure(response, failure);
}
- private void emitQueryTime(long requestTimeNs, boolean success)
+ private void emitQueryTime(long requestTimeNs, boolean success, @Nullable String sqlQueryId, @Nullable String queryId)
{
- QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
- queryMetricsFactory,
- warehouse.getToolChest(query),
- query,
- req.getRemoteAddr()
- );
+ QueryMetrics queryMetrics;
+ if (sqlQueryId != null) {
+ queryMetrics = queryMetricsFactory.makeMetrics();
+ queryMetrics.remoteAddress(req.getRemoteAddr());
+ // Setting sqlQueryId and queryId dimensions to the metric
+ queryMetrics.sqlQueryId(sqlQueryId);
+ if (queryId != null) { // query id is null for JDBC SQL
+ queryMetrics.queryId(queryId);
+ }
+ } else {
+ queryMetrics = DruidMetrics.makeRequestMetrics(
+ queryMetricsFactory,
+ warehouse.getToolChest(query),
+ query,
+ req.getRemoteAddr()
+ );
+ }
queryMetrics.success(success);
queryMetrics.reportQueryTime(requestTimeNs).emit(emitter);
}
diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index 526a934008..8540a4ca4c 100644
--- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -48,6 +48,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.MapQueryToolChestWarehouse;
@@ -72,6 +73,11 @@ import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlQuery;
import org.easymock.EasyMock;
import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
@@ -205,15 +211,24 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
@Test
public void testSqlQueryProxy() throws Exception
{
- final SqlQuery query = new SqlQuery("SELECT * FROM foo", ResultFormat.ARRAY, false, false, false, null, null);
+ final SqlQuery query = new SqlQuery(
+ "SELECT * FROM foo",
+ ResultFormat.ARRAY,
+ false,
+ false,
+ false,
+ ImmutableMap.of("sqlQueryId", "dummy"),
+ null
+ );
final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class);
- EasyMock.expect(hostFinder.findServerSql(query))
- .andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
+ EasyMock.expect(hostFinder.findServerSql(
+ query.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", "queryId", "dummy")))
+ ).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
EasyMock.replay(hostFinder);
Properties properties = new Properties();
properties.setProperty("druid.router.sql.enable", "true");
- verifyServletCallsForQuery(query, true, hostFinder, properties);
+ verifyServletCallsForQuery(query, true, false, hostFinder, properties);
}
@Test
@@ -230,7 +245,21 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
EasyMock.expect(hostFinder.pickServer(query)).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
EasyMock.replay(hostFinder);
- verifyServletCallsForQuery(query, false, hostFinder, new Properties());
+ verifyServletCallsForQuery(query, false, false, hostFinder, new Properties());
+ }
+
+ @Test
+ public void testJDBCSqlProxy() throws Exception
+ {
+ final ImmutableMap<String, Object> jdbcRequest = ImmutableMap.of("connectionId", "dummy");
+
+ final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class);
+ EasyMock.expect(hostFinder.findServerAvatica("dummy"))
+ .andReturn(new TestServer("http", "1.2.3.4", 9999))
+ .once();
+ EasyMock.replay(hostFinder);
+
+ verifyServletCallsForQuery(jdbcRequest, false, true, hostFinder, new Properties());
}
@Test
@@ -485,13 +514,13 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
*/
private void verifyServletCallsForQuery(
Object query,
- boolean isSql,
+ boolean isNativeSql,
+ boolean isJDBCSql,
QueryHostFinder hostFinder,
Properties properties
) throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
- final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class);
final ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonMapper.writeValueAsBytes(query));
final ServletInputStream servletInputStream = new ServletInputStream()
{
@@ -525,22 +554,58 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
return b;
}
};
+ final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class);
EasyMock.expect(requestMock.getContentType()).andReturn("application/json").times(2);
requestMock.setAttribute("org.apache.druid.proxy.objectMapper", jsonMapper);
EasyMock.expectLastCall();
- EasyMock.expect(requestMock.getRequestURI()).andReturn(isSql ? "/druid/v2/sql" : "/druid/v2/");
+ EasyMock.expect(requestMock.getRequestURI())
+ .andReturn(isNativeSql ? "/druid/v2/sql" : (isJDBCSql ? "/druid/v2/sql/avatica" : "/druid/v2/"));
EasyMock.expect(requestMock.getMethod()).andReturn("POST");
- EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
+ if (isNativeSql) {
+ SqlQuery sqlQuery = (SqlQuery) query;
+ query = sqlQuery.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", "queryId", "dummy"));
+ }
requestMock.setAttribute(
- isSql ? "org.apache.druid.proxy.sqlQuery" : "org.apache.druid.proxy.query",
- query
+ "org.apache.druid.proxy." + (isNativeSql ? "sqlQuery" : (isJDBCSql ? "avaticaQuery" : "query")),
+ isJDBCSql ? jsonMapper.writeValueAsBytes(query) : query
);
+ EasyMock.expectLastCall();
+ EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
+
+ // metrics related mocking
+ EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.avaticaQuery"))
+ .andReturn(isJDBCSql ? query : null);
+ EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.query"))
+ .andReturn(isJDBCSql ? null : (isNativeSql ? null : query));
+ EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.sqlQuery"))
+ .andReturn(isJDBCSql ? null : (isNativeSql ? query : null));
+ EasyMock.expect(requestMock.getRemoteAddr()).andReturn("0.0.0.0:0").times(isJDBCSql ? 1 : 2);
requestMock.setAttribute("org.apache.druid.proxy.to.host", "1.2.3.4:9999");
+ EasyMock.expectLastCall();
requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http");
EasyMock.expectLastCall();
EasyMock.replay(requestMock);
final AtomicLong didService = new AtomicLong();
+ final Request proxyRequestMock = Mockito.spy(Request.class);
+ final Result result = new Result(
+ proxyRequestMock,
+ new HttpResponse(proxyRequestMock, ImmutableList.of())
+ {
+ @Override
+ public HttpFields getHeaders()
+ {
+ HttpFields httpFields = new HttpFields();
+ if (isJDBCSql) {
+ httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "jdbcDummy"));
+ } else if (isNativeSql) {
+ httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "dummy"));
+ }
+ return httpFields;
+ }
+ }
+ );
+ final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet(
new MapQueryToolChestWarehouse(ImmutableMap.of()),
jsonMapper,
@@ -548,7 +613,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
hostFinder,
null,
null,
- new NoopServiceEmitter(),
+ stubServiceEmitter,
new NoopRequestLogger(),
new DefaultGenericQueryMetricsFactory(),
new AuthenticatorMapper(ImmutableMap.of()),
@@ -568,6 +633,19 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
servlet.service(requestMock, null);
+ // NPE is expected since the listener's onComplete calls the parent class' onComplete which fails due to
+ // partial state of the servlet. Hence, only catching the exact exception to avoid possible errors.
+ // Further, the metric assertions are also done to ensure that the metrics have emitted.
+ try {
+ servlet.newProxyResponseListener(requestMock, null).onComplete(result);
+ }
+ catch (NullPointerException ignored) {
+ }
+ Assert.assertEquals("query/time", stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
+ if (!isJDBCSql) {
+ Assert.assertEquals("dummy", stubServiceEmitter.getEvents().get(0).toMap().get("id"));
+ }
+
// This test is mostly about verifying that the servlet calls the right methods the right number of times.
EasyMock.verify(hostFinder, requestMock);
Assert.assertEquals(1, didService.get());
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
index b033bf4ba9..1a78705ab7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
@@ -180,6 +180,8 @@ public class NativeQueryMaker implements QueryMaker
final String queryId = UUID.randomUUID().toString();
plannerContext.addNativeQueryId(queryId);
query = query.withId(queryId);
+ } else {
+ plannerContext.addNativeQueryId(query.getId());
}
query = query.withSqlQueryId(plannerContext.getSqlQueryId());
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java b/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java
index 460a8be697..242df5c68b 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java
@@ -81,6 +81,19 @@ public class SqlQuery
}
}
+ public SqlQuery withOverridenContext(Map<String, Object> overridenContext)
+ {
+ return new SqlQuery(
+ getQuery(),
+ getResultFormat(),
+ includeHeader(),
+ includeTypesHeader(),
+ includeSqlTypesHeader(),
+ overridenContext,
+ getParameters()
+ );
+ }
+
@JsonProperty
public String getQuery()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org