You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/03/30 23:26:51 UTC

[druid] branch 0.18.0 updated: add lane enforcement for joinish queries (#9563) (#9585)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.0 by this push:
     new 8d2183f  add lane enforcement for joinish queries (#9563) (#9585)
8d2183f is described below

commit 8d2183faa85cdf05c44fd8ff0499f9bb849d5938
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Mar 30 16:26:41 2020 -0700

    add lane enforcement for joinish queries (#9563) (#9585)
    
    * add lane enforcement for joinish queries
    
    * oops
    
    * style
    
    * review stuffs
---
 .../query/CachingClusteredClientBenchmark.java     |  12 +-
 extensions-contrib/moving-average-query/pom.xml    |   7 +
 .../movingaverage/MovingAverageQueryTest.java      |  11 +-
 .../apache/druid/client/SegmentServerSelector.java |  26 ++-
 .../druid/server/LocalQuerySegmentWalker.java      |  37 +++--
 .../org/apache/druid/server/QueryScheduler.java    |  13 ++
 .../CachingClusteredClientFunctionalityTest.java   |   7 +-
 .../druid/server/ClientQuerySegmentWalkerTest.java |  65 +++++++-
 .../druid/server/ObservableQueryScheduler.java     | 180 +++++++++++++++++++++
 .../org/apache/druid/server/QueryResourceTest.java |   7 +-
 .../apache/druid/server/QuerySchedulerTest.java    |  93 -----------
 .../org/apache/druid/server/QueryStackTests.java   |  20 ++-
 .../server/TestClusterQuerySegmentWalker.java      |   6 +-
 .../druid/sql/calcite/util/CalciteTests.java       |   5 +-
 .../util/SpecificSegmentsQuerySegmentWalker.java   |   7 +-
 15 files changed, 348 insertions(+), 148 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index febebbf..94b560e 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -104,11 +104,8 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory;
 import org.apache.druid.query.topn.TopNResultValue;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
 import org.apache.druid.timeline.SegmentId;
@@ -343,12 +340,7 @@ public class CachingClusteredClientBenchmark
         new DruidHttpClientConfig(),
         processingConfig,
         forkJoinPool,
-        new QueryScheduler(
-            0,
-            ManualQueryPrioritizationStrategy.INSTANCE,
-            NoQueryLaningStrategy.INSTANCE,
-            new ServerConfig()
-        )
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
     );
   }
 
diff --git a/extensions-contrib/moving-average-query/pom.xml b/extensions-contrib/moving-average-query/pom.xml
index c5ad9ee..6c77b60 100644
--- a/extensions-contrib/moving-average-query/pom.xml
+++ b/extensions-contrib/moving-average-query/pom.xml
@@ -106,6 +106,13 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-server</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 8d76692..971f8ed 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -65,10 +65,8 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
 import org.apache.druid.server.ClientQuerySegmentWalker;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.TimelineLookup;
 import org.hamcrest.core.IsInstanceOf;
@@ -365,12 +363,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
           }
         },
         ForkJoinPool.commonPool(),
-        new QueryScheduler(
-            0,
-            ManualQueryPrioritizationStrategy.INSTANCE,
-            NoQueryLaningStrategy.INSTANCE,
-            new ServerConfig()
-        )
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
     );
 
     ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
diff --git a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
index 007b7a2..5f5de0e 100644
--- a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
+++ b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
@@ -19,22 +19,46 @@
 
 package org.apache.druid.client;
 
+import com.google.common.base.Preconditions;
 import org.apache.druid.client.selector.ServerSelector;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.query.SegmentDescriptor;
 
+import javax.annotation.Nullable;
+
 /**
  * Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query.
  *
- * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data
+ * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data. Used
+ * by {@link org.apache.druid.server.LocalQuerySegmentWalker} on the broker for on broker queries
  */
 public class SegmentServerSelector extends Pair<ServerSelector, SegmentDescriptor>
 {
+  /**
+   * This is for a segment hosted on a remote server, where {@link ServerSelector} may be used to pick
+   * a {@link DruidServer} to query.
+   */
   public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment)
   {
     super(server, segment);
+    Preconditions.checkNotNull(server, "ServerSelector must not be null");
+    Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
+  }
+
+  /**
+   * This is for a segment hosted locally
+   */
+  public SegmentServerSelector(SegmentDescriptor segment)
+  {
+    super(null, segment);
+    Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
   }
 
+  /**
+   * This may be null if {@link SegmentDescriptor} is locally available, but will definitely not be null for segments
+   * which must be queried remotely (e.g. {@link CachingClusteredClient})
+   */
+  @Nullable
   public ServerSelector getServer()
   {
     return lhs;
diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
index 0a01b8c..d7f39ad 100644
--- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server;
 
 import com.google.inject.Inject;
+import org.apache.druid.client.SegmentServerSelector;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -27,6 +28,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.FluentQueryRunnerBuilder;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -39,6 +41,8 @@ import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.join.Joinables;
 import org.joda.time.Interval;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.StreamSupport;
@@ -57,6 +61,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
   private final QueryRunnerFactoryConglomerate conglomerate;
   private final SegmentWrangler segmentWrangler;
   private final JoinableFactory joinableFactory;
+  private final QueryScheduler scheduler;
   private final ServiceEmitter emitter;
 
   @Inject
@@ -64,12 +69,14 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
       QueryRunnerFactoryConglomerate conglomerate,
       SegmentWrangler segmentWrangler,
       JoinableFactory joinableFactory,
+      QueryScheduler scheduler,
       ServiceEmitter emitter
   )
   {
     this.conglomerate = conglomerate;
     this.segmentWrangler = segmentWrangler;
     this.joinableFactory = joinableFactory;
+    this.scheduler = scheduler;
     this.emitter = emitter;
   }
 
@@ -82,21 +89,23 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
       throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource());
     }
 
-    final AtomicLong cpuAccumulator = new AtomicLong(0L);
-    final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(query);
     final Iterable<Segment> segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals);
+    final Query<T> prioritizedAndLaned = prioritizeAndLaneQuery(query, segments);
+
+    final AtomicLong cpuAccumulator = new AtomicLong(0L);
     final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
         analysis.getPreJoinableClauses(),
         joinableFactory,
         cpuAccumulator,
-        QueryContexts.getEnableJoinFilterPushDown(query),
-        QueryContexts.getEnableJoinFilterRewrite(query),
-        QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
-        QueryContexts.getJoinFilterRewriteMaxSize(query),
-        query.getFilter() == null ? null : query.getFilter().toFilter(),
-        query.getVirtualColumns()
+        QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned),
+        QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned),
+        QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned),
+        QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned),
+        prioritizedAndLaned.getFilter() == null ? null : prioritizedAndLaned.getFilter().toFilter(),
+        prioritizedAndLaned.getVirtualColumns()
     );
 
+    final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(prioritizedAndLaned);
     final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
         Execs.directExecutor(),
         () -> StreamSupport.stream(segments.spliterator(), false)
@@ -107,17 +116,25 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
     // Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where
     // it is already supported.
     return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest())
-        .create(baseRunner)
+        .create(scheduler.wrapQueryRunner(baseRunner))
         .applyPreMergeDecoration()
         .mergeResults()
         .applyPostMergeDecoration()
         .emitCPUTimeMetric(emitter, cpuAccumulator);
   }
-
   @Override
   public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
   {
     // SegmentWranglers only work based on intervals and cannot run with specific segments.
     throw new ISE("Cannot run with specific segments");
   }
+
+  private <T> Query<T> prioritizeAndLaneQuery(Query<T> query, Iterable<Segment> segments)
+  {
+    Set<SegmentServerSelector> segmentServerSelectors = new HashSet<>();
+    for (Segment s : segments) {
+      segmentServerSelectors.add(new SegmentServerSelector(s.getId().toDescriptor()));
+    }
+    return scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), segmentServerSelectors);
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index 86c9ec9..f50b50c 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -30,10 +30,12 @@ import io.github.resilience4j.bulkhead.BulkheadRegistry;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import org.apache.druid.client.SegmentServerSelector;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.LazySequence;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryWatcher;
 import org.apache.druid.server.initialization.ServerConfig;
 
@@ -141,6 +143,17 @@ public class QueryScheduler implements QueryWatcher
   }
 
   /**
+   * Returns a {@link QueryRunner} that will call {@link QueryScheduler#run} when {@link QueryRunner#run} is called.
+   */
+  public <T> QueryRunner<T> wrapQueryRunner(QueryRunner<T> baseRunner)
+  {
+    return (queryPlus, responseContext) ->
+        QueryScheduler.this.run(
+            queryPlus.getQuery(), new LazySequence<>(() -> baseRunner.run(queryPlus, responseContext))
+        );
+  }
+
+  /**
    * Forcibly cancel all futures that have been registered to a specific query id
    */
   public boolean cancelQuery(String id)
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index 779fd54..5303989 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -47,11 +47,8 @@ import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.planning.DataSourceAnalysis;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineLookup;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -335,7 +332,7 @@ public class CachingClusteredClientFunctionalityTest
           }
         },
         ForkJoinPool.commonPool(),
-        new QueryScheduler(0, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
     );
   }
 
diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index 2215dcb..c98c776 100644
--- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -70,6 +70,8 @@ import org.apache.druid.segment.join.JoinType;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
+import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -151,12 +153,20 @@ public class ClientQuerySegmentWalkerTest
   // version VERSION, and shard spec SHARD_SPEC.
   private ClientQuerySegmentWalker walker;
 
+  private ObservableQueryScheduler scheduler;
+
   @Before
   public void setUp()
   {
     closer = Closer.create();
     conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
-    initWalker(ImmutableMap.of());
+    scheduler = new ObservableQueryScheduler(
+        8,
+        ManualQueryPrioritizationStrategy.INSTANCE,
+        NoQueryLaningStrategy.INSTANCE,
+        new ServerConfig()
+    );
+    initWalker(ImmutableMap.of(), scheduler);
   }
 
   @After
@@ -182,6 +192,11 @@ public class ClientQuerySegmentWalkerTest
         ImmutableList.of(ExpectedQuery.cluster(query)),
         ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
     );
+
+    Assert.assertEquals(1, scheduler.getTotalRun().get());
+    Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(1, scheduler.getTotalReleased().get());
   }
 
   @Test
@@ -200,6 +215,11 @@ public class ClientQuerySegmentWalkerTest
         ImmutableList.of(ExpectedQuery.local(query)),
         ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
     );
+
+    Assert.assertEquals(1, scheduler.getTotalRun().get());
+    Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(1, scheduler.getTotalReleased().get());
   }
 
   @Test
@@ -236,6 +256,13 @@ public class ClientQuerySegmentWalkerTest
         ),
         ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
     );
+
+    // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+    // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+    Assert.assertEquals(2, scheduler.getTotalRun().get());
+    Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(2, scheduler.getTotalReleased().get());
   }
 
   @Test
@@ -263,6 +290,11 @@ public class ClientQuerySegmentWalkerTest
         ImmutableList.of(ExpectedQuery.cluster(subquery)),
         ImmutableList.of(new Object[]{3L})
     );
+
+    Assert.assertEquals(1, scheduler.getTotalRun().get());
+    Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(1, scheduler.getTotalReleased().get());
   }
 
   @Test
@@ -299,6 +331,13 @@ public class ClientQuerySegmentWalkerTest
             new Object[]{"z", 1L}
         )
     );
+
+    // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+    // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+    Assert.assertEquals(2, scheduler.getTotalRun().get());
+    Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(2, scheduler.getTotalReleased().get());
   }
 
   @Test
@@ -351,6 +390,13 @@ public class ClientQuerySegmentWalkerTest
         ),
         ImmutableList.of(new Object[]{"y", "y", 1L})
     );
+
+    // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+    // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+    Assert.assertEquals(2, scheduler.getTotalRun().get());
+    Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+    Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+    Assert.assertEquals(2, scheduler.getTotalReleased().get());
   }
 
   @Test
@@ -408,10 +454,18 @@ public class ClientQuerySegmentWalkerTest
   }
 
   /**
-   * Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
+   * Initialize (or reinitialize) our {@link #walker} and {@link #closer} with default scheduler.
    */
   private void initWalker(final Map<String, String> serverProperties)
   {
+    initWalker(serverProperties, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
+  }
+
+  /**
+   * Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
+   */
+  private void initWalker(final Map<String, String> serverProperties, QueryScheduler schedulerForTest)
+  {
     final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
     final ServerConfig serverConfig = jsonMapper.convertValue(serverProperties, ServerConfig.class);
 
@@ -472,7 +526,7 @@ public class ClientQuerySegmentWalkerTest
                 ),
                 joinableFactory,
                 conglomerate,
-                null /* QueryScheduler */
+                schedulerForTest
             ),
             ClusterOrLocal.CLUSTER
         ),
@@ -480,8 +534,9 @@ public class ClientQuerySegmentWalkerTest
             QueryStackTests.createLocalQuerySegmentWalker(
                 conglomerate,
                 segmentWrangler,
-                joinableFactory
-            ),
+                joinableFactory,
+                schedulerForTest
+                ),
             ClusterOrLocal.LOCAL
         ),
         conglomerate,
diff --git a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java
new file mode 100644
index 0000000..638f5f2
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import org.apache.druid.client.SegmentServerSelector;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.server.initialization.ServerConfig;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link QueryScheduler} for testing, with counters on its internal functions so its operation can be observed
+ * and verified by tests
+ */
+public class ObservableQueryScheduler extends QueryScheduler
+{
+  private final AtomicLong totalAcquired;
+  private final AtomicLong totalReleased;
+  private final AtomicLong laneAcquired;
+  private final AtomicLong laneNotAcquired;
+  private final AtomicLong laneReleased;
+  private final AtomicLong totalPrioritizedAndLaned;
+  private final AtomicLong totalRun;
+
+  public ObservableQueryScheduler(
+      int totalNumThreads,
+      QueryPrioritizationStrategy prioritizationStrategy,
+      QueryLaningStrategy laningStrategy,
+      ServerConfig serverConfig
+  )
+  {
+    super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig);
+
+    totalAcquired = new AtomicLong();
+    totalReleased = new AtomicLong();
+    laneAcquired = new AtomicLong();
+    laneNotAcquired = new AtomicLong();
+    laneReleased = new AtomicLong();
+    totalPrioritizedAndLaned = new AtomicLong();
+    totalRun = new AtomicLong();
+  }
+
+  @Override
+  public <T> Sequence<T> run(
+      Query<?> query,
+      Sequence<T> resultSequence
+  )
+  {
+    return super.run(query, resultSequence).withBaggage(totalRun::incrementAndGet);
+  }
+
+  @Override
+  public <T> Query<T> prioritizeAndLaneQuery(
+      QueryPlus<T> queryPlus,
+      Set<SegmentServerSelector> segments
+  )
+  {
+    totalPrioritizedAndLaned.incrementAndGet();
+    return super.prioritizeAndLaneQuery(queryPlus, segments);
+  }
+
+  @Override
+  List<Bulkhead> acquireLanes(Query<?> query)
+  {
+    List<Bulkhead> bulkheads = super.acquireLanes(query);
+    if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+      totalAcquired.incrementAndGet();
+    }
+    if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+      laneAcquired.incrementAndGet();
+    }
+
+    return bulkheads;
+  }
+
+  @Override
+  void releaseLanes(List<Bulkhead> bulkheads)
+  {
+    super.releaseLanes(bulkheads);
+    if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+      totalReleased.incrementAndGet();
+    }
+    if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+      laneReleased.incrementAndGet();
+      if (bulkheads.size() == 1) {
+        laneNotAcquired.incrementAndGet();
+      }
+    }
+  }
+
+  @Override
+  void finishLanes(List<Bulkhead> bulkheads)
+  {
+    super.finishLanes(bulkheads);
+    if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+      totalReleased.incrementAndGet();
+    }
+    if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+      laneReleased.incrementAndGet();
+    }
+  }
+
+  /**
+   * Number of times that 'total' query count semaphore was acquired
+   */
+  public AtomicLong getTotalAcquired()
+  {
+    return totalAcquired;
+  }
+
+  /**
+   * Number of times that 'total' query count semaphore was released
+   */
+  public AtomicLong getTotalReleased()
+  {
+    return totalReleased;
+  }
+
+  /**
+   * Number of times that the query count semaphore of any lane was acquired
+   */
+  public AtomicLong getLaneAcquired()
+  {
+    return laneAcquired;
+  }
+
+  /**
+   * Number of times that the query count semaphore of any lane was acquired but the 'total' semaphore was NOT acquired
+   */
+  public AtomicLong getLaneNotAcquired()
+  {
+    return laneNotAcquired;
+  }
+
+  /**
+   * Number of times that the query count semaphore of any lane was released
+   */
+  public AtomicLong getLaneReleased()
+  {
+    return laneReleased;
+  }
+
+  /**
+   * Number of times that {@link QueryScheduler#prioritizeAndLaneQuery} was called
+   */
+  public AtomicLong getTotalPrioritizedAndLaned()
+  {
+    return totalPrioritizedAndLaned;
+  }
+
+  /**
+   * Number of times that {@link QueryScheduler#run} was called
+   */
+  public AtomicLong getTotalRun()
+  {
+    return totalRun;
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index d17a372..5bda433 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -174,12 +174,7 @@ public class QueryResourceTest
     EasyMock.expect(testServletRequest.getHeader("Accept")).andReturn(MediaType.APPLICATION_JSON).anyTimes();
     EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes();
     EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
-    queryScheduler = new QueryScheduler(
-        8,
-        ManualQueryPrioritizationStrategy.INSTANCE,
-        NoQueryLaningStrategy.INSTANCE,
-        new ServerConfig()
-    );
+    queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER;
     testRequestLogger = new TestRequestLogger();
     queryResource = new QueryResource(
         new QueryLifecycleFactory(
diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 177be82..485275b 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -30,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.ProvisionException;
-import io.github.resilience4j.bulkhead.Bulkhead;
 import org.apache.druid.client.SegmentServerSelector;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.JsonConfigProvider;
@@ -72,7 +71,6 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
 
 public class QuerySchedulerTest
 {
@@ -708,95 +706,4 @@ public class QuerySchedulerTest
     );
     return injector;
   }
-
-  private static class ObservableQueryScheduler extends QueryScheduler
-  {
-    private final AtomicLong totalAcquired;
-    private final AtomicLong totalReleased;
-    private final AtomicLong laneAcquired;
-    private final AtomicLong laneNotAcquired;
-    private final AtomicLong laneReleased;
-
-    public ObservableQueryScheduler(
-        int totalNumThreads,
-        QueryPrioritizationStrategy prioritizationStrategy,
-        QueryLaningStrategy laningStrategy,
-        ServerConfig serverConfig
-    )
-    {
-      super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig);
-
-      totalAcquired = new AtomicLong();
-      totalReleased = new AtomicLong();
-      laneAcquired = new AtomicLong();
-      laneNotAcquired = new AtomicLong();
-      laneReleased = new AtomicLong();
-    }
-
-    @Override
-    List<Bulkhead> acquireLanes(Query<?> query)
-    {
-      List<Bulkhead> bulkheads = super.acquireLanes(query);
-      if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
-        totalAcquired.incrementAndGet();
-      }
-      if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
-        laneAcquired.incrementAndGet();
-      }
-
-      return bulkheads;
-    }
-
-    @Override
-    void releaseLanes(List<Bulkhead> bulkheads)
-    {
-      super.releaseLanes(bulkheads);
-      if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
-        totalReleased.incrementAndGet();
-      }
-      if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
-        laneReleased.incrementAndGet();
-        if (bulkheads.size() == 1) {
-          laneNotAcquired.incrementAndGet();
-        }
-      }
-    }
-
-    @Override
-    void finishLanes(List<Bulkhead> bulkheads)
-    {
-      super.finishLanes(bulkheads);
-      if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
-        totalReleased.incrementAndGet();
-      }
-      if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
-        laneReleased.incrementAndGet();
-      }
-    }
-
-    public AtomicLong getTotalAcquired()
-    {
-      return totalAcquired;
-    }
-
-    public AtomicLong getTotalReleased()
-    {
-      return totalReleased;
-    }
-
-    public AtomicLong getLaneAcquired()
-    {
-      return laneAcquired;
-    }
-
-    public AtomicLong getLaneNotAcquired()
-    {
-      return laneNotAcquired;
-    }
-
-    public AtomicLong getLaneReleased()
-    {
-      return laneReleased;
-    }
-  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index a07e14e..1370ed7 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -64,6 +64,8 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
+import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 
 import javax.annotation.Nullable;
@@ -75,6 +77,12 @@ import java.util.Map;
  */
 public class QueryStackTests
 {
+  public static final QueryScheduler DEFAULT_NOOP_SCHEDULER = new QueryScheduler(
+      0,
+      ManualQueryPrioritizationStrategy.INSTANCE,
+      NoQueryLaningStrategy.INSTANCE,
+      new ServerConfig()
+  );
   private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
   private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
 
@@ -148,10 +156,17 @@ public class QueryStackTests
   public static LocalQuerySegmentWalker createLocalQuerySegmentWalker(
       final QueryRunnerFactoryConglomerate conglomerate,
       final SegmentWrangler segmentWrangler,
-      final JoinableFactory joinableFactory
+      final JoinableFactory joinableFactory,
+      final QueryScheduler scheduler
   )
   {
-    return new LocalQuerySegmentWalker(conglomerate, segmentWrangler, joinableFactory, EMITTER);
+    return new LocalQuerySegmentWalker(
+        conglomerate,
+        segmentWrangler,
+        joinableFactory,
+        scheduler,
+        EMITTER
+    );
   }
 
   /**
@@ -255,4 +270,5 @@ public class QueryStackTests
 
     return conglomerate;
   }
+
 }
diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
index 7fef5a9..cc3a406 100644
--- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
+++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
@@ -165,11 +165,13 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
 
     // Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
     // This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
-    // to function properly.
+    // to function properly. SegmentServerSelector does not currently mimic CachingClusteredClient, it is using
+    // the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects
+    // to actually serve the queries
     return (theQuery, responseContext) -> {
       if (scheduler != null) {
         Set<SegmentServerSelector> segments = new HashSet<>();
-        specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec)));
+        specs.forEach(spec -> segments.add(new SegmentServerSelector(spec)));
         return scheduler.run(
             scheduler.prioritizeAndLaneQuery(theQuery, segments),
             new LazySequence<>(
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 5d446c5..5ae1390 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -77,6 +77,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 import org.apache.druid.server.log.NoopRequestLogger;
 import org.apache.druid.server.security.Access;
@@ -572,13 +573,13 @@ public class CalciteTests
       final File tmpDir
   )
   {
-    return createMockWalker(conglomerate, tmpDir, null);
+    return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
   }
 
   public static SpecificSegmentsQuerySegmentWalker createMockWalker(
       final QueryRunnerFactoryConglomerate conglomerate,
       final File tmpDir,
-      @Nullable final QueryScheduler scheduler
+      final QueryScheduler scheduler
   )
   {
     final QueryableIndex index1 = IndexBuilder
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 50134ff..1da0fba 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -85,7 +85,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
       final QueryRunnerFactoryConglomerate conglomerate,
       final LookupExtractorFactoryContainerProvider lookupProvider,
       @Nullable final JoinableFactory joinableFactory,
-      @Nullable final QueryScheduler scheduler
+      final QueryScheduler scheduler
   )
   {
     final JoinableFactory joinableFactoryToUse;
@@ -116,7 +116,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
                     .put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider))
                     .build()
             ),
-            joinableFactoryToUse
+            joinableFactoryToUse,
+            scheduler
         ),
         conglomerate,
         new ServerConfig()
@@ -146,7 +147,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
           }
         },
         null,
-        null
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org