You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ro...@apache.org on 2022/09/07 08:24:52 UTC

[druid] branch master updated: Add query/time metric for SQL queries from router (#12867)

This is an automated email from the ASF dual-hosted git repository.

rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aa8d7f987 Add query/time metric for SQL queries from router (#12867)
7aa8d7f987 is described below

commit 7aa8d7f9875cbe950696b5c94e375ac616dd2d04
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Wed Sep 7 13:54:46 2022 +0530

    Add query/time metric for SQL queries from router (#12867)
    
    * Add query/time metric for SQL queries from router
    
    * Fix query cancel bug when user has overriden native query-id in a SQL query
---
 docs/operations/metrics.md                         |   5 +
 .../query/DefaultGenericQueryMetricsFactory.java   |   5 +
 .../apache/druid/query/DefaultQueryMetrics.java    |  12 ++
 .../druid/query/GenericQueryMetricsFactory.java    |   7 +
 .../java/org/apache/druid/query/QueryMetrics.java  |  12 ++
 .../query/search/DefaultSearchQueryMetrics.java    |  12 ++
 .../druid/query/DefaultQueryMetricsTest.java       |   6 +-
 .../search/DefaultSearchQueryMetricsTest.java      |   3 +
 .../org/apache/druid/server/QueryResource.java     |   3 +-
 .../druid/server/AsyncQueryForwardingServlet.java  | 210 ++++++++++++++++++---
 .../server/AsyncQueryForwardingServletTest.java    | 102 ++++++++--
 .../druid/sql/calcite/run/NativeQueryMaker.java    |   2 +
 .../java/org/apache/druid/sql/http/SqlQuery.java   |  13 ++
 13 files changed, 348 insertions(+), 44 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8bc302e598..6cc013664e 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -39,6 +39,11 @@ Metrics may have additional dimensions beyond those listed above.
 
 ## Query metrics
 
+### Router
+|Metric|Description|Dimensions|Normal Value|
+|------|-----------|----------|------------|
+|`query/time`|Milliseconds taken to complete a query.|Native Query: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id.|< 1s|
+
 ### Broker
 
 |Metric|Description|Dimensions|Normal Value|
diff --git a/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java b/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java
index e0c5b6017b..ec2662bad4 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultGenericQueryMetricsFactory.java
@@ -46,4 +46,9 @@ public class DefaultGenericQueryMetricsFactory implements GenericQueryMetricsFac
     return queryMetrics;
   }
 
+  @Override
+  public QueryMetrics<Query<?>> makeMetrics()
+  {
+    return new DefaultQueryMetrics<>();
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
index ac20cc248d..441b36d5bb 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
@@ -132,6 +132,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
     setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(query.getId()));
   }
 
+  @Override
+  public void queryId(String queryId)
+  {
+    setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(queryId));
+  }
+
   @Override
   public void subQueryId(QueryType query)
   {
@@ -144,6 +150,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
     // Emit nothing by default.
   }
 
+  @Override
+  public void sqlQueryId(String sqlQueryId)
+  {
+    // Emit nothing by default.
+  }
+
   @Override
   public void context(QueryType query)
   {
diff --git a/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java b/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java
index ce9c4e19c4..91e1e29ce0 100644
--- a/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/GenericQueryMetricsFactory.java
@@ -46,4 +46,11 @@ public interface GenericQueryMetricsFactory
    * call {@link QueryMetrics#query(Query)} with the given query on the created QueryMetrics object before returning.
    */
   QueryMetrics<Query<?>> makeMetrics(Query<?> query);
+
+  /**
+   * Creates a {@link QueryMetrics} which doesn't have predefined QueryMetrics subclass. This method is used
+   * by the router to build a {@link QueryMetrics} for SQL queries. It is needed since at router, there is no native
+   * query linked to a SQL query.
+   */
+  QueryMetrics<Query<?>> makeMetrics();
 }
diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
index 4d2406c697..f4d71060bf 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
@@ -202,6 +202,12 @@ public interface QueryMetrics<QueryType extends Query<?>>
   @PublicApi
   void queryId(QueryType query);
 
+  /**
+   * Sets id of the given query as dimension.
+   */
+  @PublicApi
+  void queryId(@SuppressWarnings("UnusedParameters") String queryId);
+
   /**
    * Sets {@link Query#getSubQueryId()} of the given query as dimension.
    */
@@ -214,6 +220,12 @@ public interface QueryMetrics<QueryType extends Query<?>>
   @PublicApi
   void sqlQueryId(QueryType query);
 
+  /**
+   * Sets sqlQueryId as a dimension
+   */
+  @PublicApi
+  void sqlQueryId(@SuppressWarnings("UnusedParameters") String sqlQueryId);
+
   /**
    * Sets {@link Query#getContext()} of the given query as dimension.
    */
diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index 108518a794..2481eeb35f 100644
--- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -87,6 +87,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
     throw new ISE("Unsupported method in default query metrics implementation.");
   }
 
+  @Override
+  public void queryId(String queryId)
+  {
+    throw new ISE("Unsupported method in default query metrics implementation.");
+  }
+
   @Override
   public void subQueryId(SearchQuery query)
   {
@@ -99,6 +105,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
     throw new ISE("Unsupported method in default query metrics implementation.");
   }
 
+  @Override
+  public void sqlQueryId(String sqlQueryId)
+  {
+    throw new ISE("Unsupported method in default query metrics implementation.");
+  }
+
   @Override
   public void granularity(SearchQuery query)
   {
diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index 7658ffd841..d512a294ef 100644
--- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -67,6 +67,10 @@ public class DefaultQueryMetricsTest
         .build();
     queryMetrics.query(query);
     queryMetrics.reportQueryTime(0).emit(serviceEmitter);
+    // No way to verify this right now since DefaultQueryMetrics implements a no-op for sqlQueryId(String) and queryId(String)
+    // This change is done to keep the code coverage tool happy by exercising the implementation
+    queryMetrics.sqlQueryId("dummy");
+    queryMetrics.queryId("dummy");
     Map<String, Object> actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
     Assert.assertEquals(13, actualEvent.size());
     Assert.assertTrue(actualEvent.containsKey("feed"));
@@ -81,7 +85,7 @@ public class DefaultQueryMetricsTest
     Assert.assertEquals(expectedStringIntervals, actualEvent.get(DruidMetrics.INTERVAL));
     Assert.assertEquals("true", actualEvent.get("hasFilters"));
     Assert.assertEquals(expectedIntervals.get(0).toDuration().toString(), actualEvent.get("duration"));
-    Assert.assertEquals("", actualEvent.get(DruidMetrics.ID));
+    Assert.assertEquals("dummy", actualEvent.get(DruidMetrics.ID));
     Assert.assertEquals("query/time", actualEvent.get("metric"));
     Assert.assertEquals(0L, actualEvent.get("value"));
     Assert.assertEquals(ImmutableMap.of("testKey", "testValue"), actualEvent.get("context"));
diff --git a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
index ae666e3902..a03781566b 100644
--- a/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/search/DefaultSearchQueryMetricsTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.search;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.CachingEmitter;
 import org.apache.druid.query.DefaultQueryMetricsTest;
@@ -86,6 +87,8 @@ public class DefaultSearchQueryMetricsTest
     // Metric
     Assert.assertEquals("query/time", actualEvent.get("metric"));
     Assert.assertEquals(0L, actualEvent.get("value"));
+
+    Assert.assertThrows(ISE.class, () -> queryMetrics.sqlQueryId("dummy"));
   }
 
   @Test
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index f2a55242ea..590347735b 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -98,6 +98,7 @@ public class QueryResource implements QueryCountStatsProvider
    */
   public static final String HEADER_RESPONSE_CONTEXT = "X-Druid-Response-Context";
   public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
+  public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id";
   public static final String HEADER_ETAG = "ETag";
 
   protected final QueryLifecycleFactory queryLifecycleFactory;
@@ -252,7 +253,7 @@ public class QueryResource implements QueryCountStatsProvider
                 },
                 ioReaderWriter.getResponseWriter().getResponseType()
             )
-            .header("X-Druid-Query-Id", queryId);
+            .header(QUERY_ID_RESPONSE_HEADER, queryId);
 
         transferEntityTag(responseContext, responseBuilder);
 
diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index 3668543bf4..fa3b52669b 100644
--- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.GenericQueryMetricsFactory;
 import org.apache.druid.query.Query;
@@ -56,6 +57,7 @@ import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authenticator;
 import org.apache.druid.server.security.AuthenticatorMapper;
 import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlResource;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
@@ -65,12 +67,14 @@ import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.http.HttpMethod;
 import org.eclipse.jetty.proxy.AsyncProxyServlet;
 
+import javax.annotation.Nullable;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
@@ -266,11 +270,16 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
         handleException(response, objectMapper, e);
         return;
       }
-    } else if (routeSqlByStrategy && isSqlQueryEndpoint && HttpMethod.POST.is(method)) {
+    } else if (isSqlQueryEndpoint && HttpMethod.POST.is(method)) {
       try {
         SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class);
+        inputSqlQuery = buildSqlQueryWithId(inputSqlQuery);
         request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
-        targetServer = hostFinder.findServerSql(inputSqlQuery);
+        if (routeSqlByStrategy) {
+          targetServer = hostFinder.findServerSql(inputSqlQuery);
+        } else {
+          targetServer = hostFinder.pickDefaultServer();
+        }
         LOG.debug("Forwarding SQL query to broker [%s]", targetServer.getHost());
       }
       catch (IOException e) {
@@ -292,6 +301,22 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
     doService(request, response);
   }
 
+  /**
+   * Rebuilds the {@link SqlQuery} object with sqlQueryId and queryId context parameters if not present
+   * @param sqlQuery the original SqlQuery
+   * @return an updated sqlQuery object with sqlQueryId and queryId context parameters
+   */
+  private SqlQuery buildSqlQueryWithId(SqlQuery sqlQuery)
+  {
+    Map<String, Object> context = new HashMap<>(sqlQuery.getContext());
+    String sqlQueryId = (String) context.getOrDefault(BaseQuery.SQL_QUERY_ID, UUID.randomUUID().toString());
+    // set queryId to sqlQueryId if not overridden
+    String queryId = (String) context.getOrDefault(BaseQuery.QUERY_ID, sqlQueryId);
+    context.put(BaseQuery.SQL_QUERY_ID, sqlQueryId);
+    context.put(BaseQuery.QUERY_ID, queryId);
+    return sqlQuery.withOverridenContext(context);
+  }
+
   /**
    * Issues async query cancellation requests to all Brokers (except the given
    * targetServer). Query cancellation on the targetServer is handled by the
@@ -449,12 +474,15 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
   @Override
   protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response)
   {
-    final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE);
-    if (query != null) {
-      return newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime());
-    } else {
-      return super.newProxyResponseListener(request, response);
-    }
+    boolean isJDBC = request.getAttribute(AVATICA_QUERY_ATTRIBUTE) != null;
+    return newMetricsEmittingProxyResponseListener(
+        request,
+        response,
+        (Query) request.getAttribute(QUERY_ATTRIBUTE),
+        (SqlQuery) request.getAttribute(SQL_QUERY_ATTRIBUTE),
+        isJDBC,
+        System.nanoTime()
+    );
   }
 
   @Override
@@ -500,11 +528,13 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
   private Response.Listener newMetricsEmittingProxyResponseListener(
       HttpServletRequest request,
       HttpServletResponse response,
-      Query query,
+      @Nullable Query query,
+      @Nullable SqlQuery sqlQuery,
+      boolean isJDBC,
       long startNs
   )
   {
-    return new MetricsEmittingProxyResponseListener(request, response, query, startNs);
+    return new MetricsEmittingProxyResponseListener(request, response, query, sqlQuery, isJDBC, startNs);
   }
 
   @Override
@@ -660,22 +690,28 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
   private class MetricsEmittingProxyResponseListener<T> extends ProxyResponseListener
   {
     private final HttpServletRequest req;
-    private final HttpServletResponse res;
+    @Nullable
     private final Query<T> query;
+    @Nullable
+    private final SqlQuery sqlQuery;
+    private final boolean isJDBC;
     private final long startNs;
 
     public MetricsEmittingProxyResponseListener(
         HttpServletRequest request,
         HttpServletResponse response,
-        Query<T> query,
+        @Nullable Query<T> query,
+        @Nullable SqlQuery sqlQuery,
+        boolean isJDBC,
         long startNs
     )
     {
       super(request, response);
 
       this.req = request;
-      this.res = response;
       this.query = query;
+      this.sqlQuery = sqlQuery;
+      this.isJDBC = isJDBC;
       this.startNs = startNs;
     }
 
@@ -683,14 +719,63 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
     public void onComplete(Result result)
     {
       final long requestTimeNs = System.nanoTime() - startNs;
-      try {
-        boolean success = result.isSucceeded();
-        if (success) {
-          successfulQueryCount.incrementAndGet();
-        } else {
-          failedQueryCount.incrementAndGet();
+      String queryId = null;
+      String sqlQueryId = null;
+      if (isJDBC) {
+        sqlQueryId = result.getResponse().getHeaders().get(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER);
+      } else if (sqlQuery != null) {
+        sqlQueryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.SQL_QUERY_ID, null);
+        queryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.QUERY_ID, null);
+      } else if (query != null) {
+        queryId = query.getId();
+      }
+
+      // not a native or SQL query, no need to emit metrics and logs
+      if (queryId == null && sqlQueryId == null) {
+        super.onComplete(result);
+        return;
+      }
+
+      boolean success = result.isSucceeded();
+      if (success) {
+        successfulQueryCount.incrementAndGet();
+      } else {
+        failedQueryCount.incrementAndGet();
+      }
+      emitQueryTime(requestTimeNs, success, sqlQueryId, queryId);
+
+      //noinspection VariableNotUsedInsideIf
+      if (sqlQueryId != null) {
+        // SQL query doesn't have a native query translation in router. Hence, not logging the native query.
+        if (sqlQuery != null) {
+          try {
+            requestLogger.logSqlQuery(
+                RequestLogLine.forSql(
+                    sqlQuery.getQuery(),
+                    sqlQuery.getContext(),
+                    DateTimes.nowUtc(),
+                    req.getRemoteAddr(),
+                    new QueryStats(
+                        ImmutableMap.of(
+                            "query/time",
+                            TimeUnit.NANOSECONDS.toMillis(requestTimeNs),
+                            "success",
+                            success
+                            && result.getResponse().getStatus() == Status.OK.getStatusCode()
+                        )
+                    )
+                )
+            );
+          }
+          catch (IOException e) {
+            LOG.error(e, "Unable to log SQL query [%s]!", sqlQuery);
+          }
         }
-        emitQueryTime(requestTimeNs, success);
+        super.onComplete(result);
+        return;
+      }
+
+      try {
         requestLogger.logNativeQuery(
             RequestLogLine.forNative(
                 query,
@@ -718,10 +803,64 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
     @Override
     public void onFailure(Response response, Throwable failure)
     {
+      final long requestTimeNs = System.nanoTime() - startNs;
+      final String errorMessage = failure.getMessage();
+      String queryId = null;
+      String sqlQueryId = null;
+      if (isJDBC) {
+        sqlQueryId = response.getHeaders().get(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER);
+      } else if (sqlQuery != null) {
+        sqlQueryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.SQL_QUERY_ID, null);
+        queryId = (String) sqlQuery.getContext().getOrDefault(BaseQuery.QUERY_ID, null);
+      } else if (query != null) {
+        queryId = query.getId();
+      }
+
+      // not a native or SQL query, no need to emit metrics and logs
+      if (queryId == null && sqlQueryId == null) {
+        super.onFailure(response, failure);
+        return;
+      }
+
+      failedQueryCount.incrementAndGet();
+      emitQueryTime(requestTimeNs, false, sqlQueryId, queryId);
+
+      //noinspection VariableNotUsedInsideIf
+      if (sqlQueryId != null) {
+        // SQL query doesn't have a native query translation in router. Hence, not logging the native query.
+        if (sqlQuery != null) {
+          try {
+            requestLogger.logSqlQuery(
+                RequestLogLine.forSql(
+                    sqlQuery.getQuery(),
+                    sqlQuery.getContext(),
+                    DateTimes.nowUtc(),
+                    req.getRemoteAddr(),
+                    new QueryStats(
+                        ImmutableMap.of(
+                            "success",
+                            false,
+                            "exception",
+                            errorMessage == null ? "no message" : errorMessage
+                        )
+                    )
+                )
+            );
+          }
+          catch (IOException e) {
+            LOG.error(e, "Unable to log SQL query [%s]!", sqlQuery);
+          }
+          LOG.makeAlert(failure, "Exception handling request")
+             .addData("exception", failure.toString())
+             .addData("sqlQuery", sqlQuery)
+             .addData("peer", req.getRemoteAddr())
+             .emit();
+        }
+        super.onFailure(response, failure);
+        return;
+      }
+
       try {
-        final String errorMessage = failure.getMessage();
-        failedQueryCount.incrementAndGet();
-        emitQueryTime(System.nanoTime() - startNs, false);
         requestLogger.logNativeQuery(
             RequestLogLine.forNative(
                 query,
@@ -751,14 +890,25 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
       super.onFailure(response, failure);
     }
 
-    private void emitQueryTime(long requestTimeNs, boolean success)
+    private void emitQueryTime(long requestTimeNs, boolean success, @Nullable String sqlQueryId, @Nullable String queryId)
     {
-      QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
-          queryMetricsFactory,
-          warehouse.getToolChest(query),
-          query,
-          req.getRemoteAddr()
-      );
+      QueryMetrics queryMetrics;
+      if (sqlQueryId != null) {
+        queryMetrics = queryMetricsFactory.makeMetrics();
+        queryMetrics.remoteAddress(req.getRemoteAddr());
+        // Setting sqlQueryId and queryId dimensions to the metric
+        queryMetrics.sqlQueryId(sqlQueryId);
+        if (queryId != null) { // query id is null for JDBC SQL
+          queryMetrics.queryId(queryId);
+        }
+      } else {
+        queryMetrics = DruidMetrics.makeRequestMetrics(
+            queryMetricsFactory,
+            warehouse.getToolChest(query),
+            query,
+            req.getRemoteAddr()
+        );
+      }
       queryMetrics.success(success);
       queryMetrics.reportQueryTime(requestTimeNs).emit(emitter);
     }
diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index 526a934008..8540a4ca4c 100644
--- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -48,6 +48,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.MapQueryToolChestWarehouse;
@@ -72,6 +73,11 @@ import org.apache.druid.sql.http.ResultFormat;
 import org.apache.druid.sql.http.SqlQuery;
 import org.easymock.EasyMock;
 import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.HandlerList;
@@ -205,15 +211,24 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
   @Test
   public void testSqlQueryProxy() throws Exception
   {
-    final SqlQuery query = new SqlQuery("SELECT * FROM foo", ResultFormat.ARRAY, false, false, false, null, null);
+    final SqlQuery query = new SqlQuery(
+        "SELECT * FROM foo",
+        ResultFormat.ARRAY,
+        false,
+        false,
+        false,
+        ImmutableMap.of("sqlQueryId", "dummy"),
+        null
+    );
     final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class);
-    EasyMock.expect(hostFinder.findServerSql(query))
-            .andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
+    EasyMock.expect(hostFinder.findServerSql(
+        query.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", "queryId", "dummy")))
+    ).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
     EasyMock.replay(hostFinder);
 
     Properties properties = new Properties();
     properties.setProperty("druid.router.sql.enable", "true");
-    verifyServletCallsForQuery(query, true, hostFinder, properties);
+    verifyServletCallsForQuery(query, true, false, hostFinder, properties);
   }
 
   @Test
@@ -230,7 +245,21 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
     EasyMock.expect(hostFinder.pickServer(query)).andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
     EasyMock.replay(hostFinder);
 
-    verifyServletCallsForQuery(query, false, hostFinder, new Properties());
+    verifyServletCallsForQuery(query, false, false, hostFinder, new Properties());
+  }
+
+  @Test
+  public void testJDBCSqlProxy() throws Exception
+  {
+    final ImmutableMap<String, Object> jdbcRequest = ImmutableMap.of("connectionId", "dummy");
+
+    final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class);
+    EasyMock.expect(hostFinder.findServerAvatica("dummy"))
+            .andReturn(new TestServer("http", "1.2.3.4", 9999))
+            .once();
+    EasyMock.replay(hostFinder);
+
+    verifyServletCallsForQuery(jdbcRequest, false, true, hostFinder, new Properties());
   }
 
   @Test
@@ -485,13 +514,13 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
    */
   private void verifyServletCallsForQuery(
       Object query,
-      boolean isSql,
+      boolean isNativeSql,
+      boolean isJDBCSql,
       QueryHostFinder hostFinder,
       Properties properties
   ) throws Exception
   {
     final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
-    final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class);
     final ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonMapper.writeValueAsBytes(query));
     final ServletInputStream servletInputStream = new ServletInputStream()
     {
@@ -525,22 +554,58 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
         return b;
       }
     };
+    final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class);
     EasyMock.expect(requestMock.getContentType()).andReturn("application/json").times(2);
     requestMock.setAttribute("org.apache.druid.proxy.objectMapper", jsonMapper);
     EasyMock.expectLastCall();
-    EasyMock.expect(requestMock.getRequestURI()).andReturn(isSql ? "/druid/v2/sql" : "/druid/v2/");
+    EasyMock.expect(requestMock.getRequestURI())
+            .andReturn(isNativeSql ? "/druid/v2/sql" : (isJDBCSql ? "/druid/v2/sql/avatica" : "/druid/v2/"));
     EasyMock.expect(requestMock.getMethod()).andReturn("POST");
-    EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
+    if (isNativeSql) {
+      SqlQuery sqlQuery = (SqlQuery) query;
+      query = sqlQuery.withOverridenContext(ImmutableMap.of("sqlQueryId", "dummy", "queryId", "dummy"));
+    }
     requestMock.setAttribute(
-        isSql ? "org.apache.druid.proxy.sqlQuery" : "org.apache.druid.proxy.query",
-        query
+        "org.apache.druid.proxy." + (isNativeSql ? "sqlQuery" : (isJDBCSql ? "avaticaQuery" : "query")),
+        isJDBCSql ? jsonMapper.writeValueAsBytes(query) : query
     );
+    EasyMock.expectLastCall();
+    EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
+
+    // metrics related mocking
+    EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.avaticaQuery"))
+            .andReturn(isJDBCSql ? query : null);
+    EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.query"))
+            .andReturn(isJDBCSql ? null : (isNativeSql ? null : query));
+    EasyMock.expect(requestMock.getAttribute("org.apache.druid.proxy.sqlQuery"))
+            .andReturn(isJDBCSql ? null : (isNativeSql ? query : null));
+    EasyMock.expect(requestMock.getRemoteAddr()).andReturn("0.0.0.0:0").times(isJDBCSql ? 1 : 2);
     requestMock.setAttribute("org.apache.druid.proxy.to.host", "1.2.3.4:9999");
+    EasyMock.expectLastCall();
     requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http");
     EasyMock.expectLastCall();
     EasyMock.replay(requestMock);
 
     final AtomicLong didService = new AtomicLong();
+    final Request proxyRequestMock = Mockito.spy(Request.class);
+    final Result result = new Result(
+        proxyRequestMock,
+        new HttpResponse(proxyRequestMock, ImmutableList.of())
+        {
+          @Override
+          public HttpFields getHeaders()
+          {
+            HttpFields httpFields = new HttpFields();
+            if (isJDBCSql) {
+              httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "jdbcDummy"));
+            } else if (isNativeSql) {
+              httpFields.add(new HttpField("X-Druid-SQL-Query-Id", "dummy"));
+            }
+            return httpFields;
+          }
+        }
+    );
+    final StubServiceEmitter stubServiceEmitter = new StubServiceEmitter("", "");
     final AsyncQueryForwardingServlet servlet = new AsyncQueryForwardingServlet(
         new MapQueryToolChestWarehouse(ImmutableMap.of()),
         jsonMapper,
@@ -548,7 +613,7 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
         hostFinder,
         null,
         null,
-        new NoopServiceEmitter(),
+        stubServiceEmitter,
         new NoopRequestLogger(),
         new DefaultGenericQueryMetricsFactory(),
         new AuthenticatorMapper(ImmutableMap.of()),
@@ -568,6 +633,19 @@ public class AsyncQueryForwardingServletTest extends BaseJettyTest
 
     servlet.service(requestMock, null);
 
+    // NPE is expected since the listener's onComplete calls the parent class' onComplete which fails due to
+    // partial state of the servlet. Hence, only catching the exact exception to avoid possible errors.
+    // Further, the metric assertions are also done to ensure that the metrics have emitted.
+    try {
+      servlet.newProxyResponseListener(requestMock, null).onComplete(result);
+    }
+    catch (NullPointerException ignored) {
+    }
+    Assert.assertEquals("query/time", stubServiceEmitter.getEvents().get(0).toMap().get("metric"));
+    if (!isJDBCSql) {
+      Assert.assertEquals("dummy", stubServiceEmitter.getEvents().get(0).toMap().get("id"));
+    }
+
     // This test is mostly about verifying that the servlet calls the right methods the right number of times.
     EasyMock.verify(hostFinder, requestMock);
     Assert.assertEquals(1, didService.get());
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
index b033bf4ba9..1a78705ab7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
@@ -180,6 +180,8 @@ public class NativeQueryMaker implements QueryMaker
       final String queryId = UUID.randomUUID().toString();
       plannerContext.addNativeQueryId(queryId);
       query = query.withId(queryId);
+    } else {
+      plannerContext.addNativeQueryId(query.getId());
     }
 
     query = query.withSqlQueryId(plannerContext.getSqlQueryId());
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java b/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java
index 460a8be697..242df5c68b 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlQuery.java
@@ -81,6 +81,19 @@ public class SqlQuery
     }
   }
 
+  public SqlQuery withOverridenContext(Map<String, Object> overridenContext)
+  {
+    return new SqlQuery(
+        getQuery(),
+        getResultFormat(),
+        includeHeader(),
+        includeTypesHeader(),
+        includeSqlTypesHeader(),
+        overridenContext,
+        getParameters()
+    );
+  }
+
   @JsonProperty
   public String getQuery()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org