You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2021/07/23 00:58:00 UTC

[druid] branch master updated: Add a new metric query/segments/count that is not emitted by default (#11394)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9767b42  Add a new metric query/segments/count that is not emitted by default (#11394)
9767b42 is described below

commit 9767b42e85a91d4e2b71b3f718d072a9b52d140d
Author: Lucas Capistrant <ca...@users.noreply.github.com>
AuthorDate: Thu Jul 22 19:57:35 2021 -0500

    Add a new metric query/segments/count that is not emitted by default (#11394)
    
    * Add a new metric query/segments/count that is not emitted by default
    
    * docs
    
    * test the default implementation of the metric
    
    * fix spelling error in docs
    
    * document the fact that query retries will result in additional metric emissions
    
    * update using recommended text from @jihoonson
---
 .../druid/benchmark/query/CachingClusteredClientBenchmark.java    | 4 +++-
 docs/operations/metrics.md                                        | 1 +
 .../apache/druid/query/movingaverage/MovingAverageQueryTest.java  | 4 +++-
 .../src/main/java/org/apache/druid/query/DefaultQueryMetrics.java | 7 +++++++
 processing/src/main/java/org/apache/druid/query/QueryMetrics.java | 5 +++++
 .../org/apache/druid/query/search/DefaultSearchQueryMetrics.java  | 6 ++++++
 .../test/java/org/apache/druid/query/DefaultQueryMetricsTest.java | 7 +++++++
 .../main/java/org/apache/druid/client/CachingClusteredClient.java | 8 +++++++-
 .../druid/client/CachingClusteredClientFunctionalityTest.java     | 4 +++-
 .../org/apache/druid/client/CachingClusteredClientPerfTest.java   | 4 +++-
 .../java/org/apache/druid/client/CachingClusteredClientTest.java  | 4 +++-
 .../druid/query/QueryRunnerBasedOnClusteredClientTestBase.java    | 4 +++-
 12 files changed, 51 insertions(+), 7 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index fe89751..36512ad 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -107,6 +107,7 @@ import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
 import org.apache.druid.timeline.SegmentId;
@@ -342,7 +343,8 @@ public class CachingClusteredClientBenchmark
         processingConfig,
         forkJoinPool,
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
   }
 
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 71cc330..d4ca7ad 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -59,6 +59,7 @@ Available Metrics
 |`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.||
 |`query/interrupted/count`|number of queries interrupted due to cancellation.|This metric is only available if the QueryCountStatsMonitor module is included.||
 |`query/timeout/count`|number of timed out queries.|This metric is only available if the QueryCountStatsMonitor module is included.||
+|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker.  [...]
 |`sqlQuery/time`|Milliseconds taken to complete a SQL query.|id, nativeQueryIds, dataSource, remoteAddress, success.|< 1s|
 |`sqlQuery/bytes`|number of bytes returned in SQL query response.|id, nativeQueryIds, dataSource, remoteAddress, success.| |
 
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index f1173aa..15fdfa7 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -70,6 +70,7 @@ import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.ClientQuerySegmentWalker;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.TimelineLookup;
 import org.hamcrest.core.IsInstanceOf;
@@ -367,7 +368,8 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
         },
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
 
     ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
index a351eeb..095fa17 100644
--- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
@@ -341,6 +341,13 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
   }
 
   @Override
+  public QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount)
+  {
+    // Don't emit by default.
+    return this;
+  }
+
+  @Override
   public void emit(ServiceEmitter emitter)
   {
     checkModifiedFromOwnerThread();
diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
index 9457fbd..304ed6c 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java
@@ -271,6 +271,11 @@ public interface QueryMetrics<QueryType extends Query<?>>
   QueryMetrics<QueryType> reportQueryBytes(long byteCount);
 
   /**
+   * Registeres "segments queried count" metric.
+   */
+  QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount);
+
+  /**
    * Registers "wait time" metric.
    */
   QueryMetrics<QueryType> reportWaitTime(long timeNs);
diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
index dd52bf5..108518a 100644
--- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java
@@ -292,6 +292,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
   }
 
   @Override
+  public QueryMetrics reportQueriedSegmentCount(long segmentCount)
+  {
+    return delegateQueryMetrics.reportQueriedSegmentCount(segmentCount);
+  }
+
+  @Override
   public void emit(ServiceEmitter emitter)
   {
     delegateQueryMetrics.emit(emitter);
diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
index e0fe0b4..5091ae0 100644
--- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java
@@ -148,5 +148,12 @@ public class DefaultQueryMetricsTest
     actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
     Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
     Assert.assertEquals(10L, actualEvent.get("value"));
+
+    // Here we are testing that Queried Segment Count does not get emitted by the DefaultQueryMetrics and the last
+    // metric remains as query/node/bytes
+    queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter);
+    actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
+    Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
+    Assert.assertEquals(10L, actualEvent.get("value"));
   }
 }
diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 4481849..8175e2a 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.BySegmentResultValueClass;
 import org.apache.druid.query.CacheStrategy;
 import org.apache.druid.query.DruidProcessingConfig;
@@ -129,6 +130,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
   private final ForkJoinPool pool;
   private final QueryScheduler scheduler;
   private final JoinableFactoryWrapper joinableFactoryWrapper;
+  private final ServiceEmitter emitter;
 
   @Inject
   public CachingClusteredClient(
@@ -142,7 +144,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
       DruidProcessingConfig processingConfig,
       @Merging ForkJoinPool pool,
       QueryScheduler scheduler,
-      JoinableFactory joinableFactory
+      JoinableFactory joinableFactory,
+      ServiceEmitter emitter
   )
   {
     this.warehouse = warehouse;
@@ -156,6 +159,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
     this.pool = pool;
     this.scheduler = scheduler;
     this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
+    this.emitter = emitter;
 
     if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
       log.warn(
@@ -369,6 +373,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
 
       query = scheduler.prioritizeAndLaneQuery(queryPlus, segmentServers);
       queryPlus = queryPlus.withQuery(query);
+      queryPlus = queryPlus.withQueryMetrics(toolChest);
+      queryPlus.getQueryMetrics().reportQueriedSegmentCount(segmentServers.size()).emit(emitter);
 
       final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = groupSegmentsByServer(segmentServers);
       LazySequence<T> mergedResultSequence = new LazySequence<>(() -> {
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index 1d591e2..a897e09 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineLookup;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -335,7 +336,8 @@ public class CachingClusteredClientFunctionalityTest
         },
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
   }
 
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
index ab1d6cd..218cac5 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
@@ -53,6 +53,7 @@ import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.coordination.ServerManagerTest;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -138,7 +139,8 @@ public class CachingClusteredClientPerfTest
         Mockito.mock(DruidProcessingConfig.class),
         ForkJoinPool.commonPool(),
         queryScheduler,
-        NoopJoinableFactory.INSTANCE
+        NoopJoinableFactory.INSTANCE,
+        new NoopServiceEmitter()
     );
 
     Query<SegmentDescriptor> fakeQuery = makeFakeQuery(interval);
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index cda0170..6fe08c4 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -125,6 +125,7 @@ import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.ServerTestHelper;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
 import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
 import org.apache.druid.timeline.DataSegment;
@@ -2850,7 +2851,8 @@ public class CachingClusteredClientTest
             NoQueryLaningStrategy.INSTANCE,
             new ServerConfig()
         ),
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
   }
 
diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
index ae0c269..94c6c59 100644
--- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
+++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
@@ -52,6 +52,7 @@ import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.joda.time.Interval;
@@ -145,7 +146,8 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
         ),
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
+        new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+        new NoopServiceEmitter()
     );
     servers = new ArrayList<>();
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org