You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/03/30 23:26:51 UTC
[druid] branch 0.18.0 updated: add lane enforcement for joinish
queries (#9563) (#9585)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.0 by this push:
new 8d2183f add lane enforcement for joinish queries (#9563) (#9585)
8d2183f is described below
commit 8d2183faa85cdf05c44fd8ff0499f9bb849d5938
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Mar 30 16:26:41 2020 -0700
add lane enforcement for joinish queries (#9563) (#9585)
* add lane enforcement for joinish queries
* oops
* style
* review stuffs
---
.../query/CachingClusteredClientBenchmark.java | 12 +-
extensions-contrib/moving-average-query/pom.xml | 7 +
.../movingaverage/MovingAverageQueryTest.java | 11 +-
.../apache/druid/client/SegmentServerSelector.java | 26 ++-
.../druid/server/LocalQuerySegmentWalker.java | 37 +++--
.../org/apache/druid/server/QueryScheduler.java | 13 ++
.../CachingClusteredClientFunctionalityTest.java | 7 +-
.../druid/server/ClientQuerySegmentWalkerTest.java | 65 +++++++-
.../druid/server/ObservableQueryScheduler.java | 180 +++++++++++++++++++++
.../org/apache/druid/server/QueryResourceTest.java | 7 +-
.../apache/druid/server/QuerySchedulerTest.java | 93 -----------
.../org/apache/druid/server/QueryStackTests.java | 20 ++-
.../server/TestClusterQuerySegmentWalker.java | 6 +-
.../druid/sql/calcite/util/CalciteTests.java | 5 +-
.../util/SpecificSegmentsQuerySegmentWalker.java | 7 +-
15 files changed, 348 insertions(+), 148 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index febebbf..94b560e 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -104,11 +104,8 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
@@ -343,12 +340,7 @@ public class CachingClusteredClientBenchmark
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool,
- new QueryScheduler(
- 0,
- ManualQueryPrioritizationStrategy.INSTANCE,
- NoQueryLaningStrategy.INSTANCE,
- new ServerConfig()
- )
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}
diff --git a/extensions-contrib/moving-average-query/pom.xml b/extensions-contrib/moving-average-query/pom.xml
index c5ad9ee..6c77b60 100644
--- a/extensions-contrib/moving-average-query/pom.xml
+++ b/extensions-contrib/moving-average-query/pom.xml
@@ -106,6 +106,13 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-server</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 8d76692..971f8ed 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -65,10 +65,8 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
@@ -365,12 +363,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
}
},
ForkJoinPool.commonPool(),
- new QueryScheduler(
- 0,
- ManualQueryPrioritizationStrategy.INSTANCE,
- NoQueryLaningStrategy.INSTANCE,
- new ServerConfig()
- )
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
diff --git a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
index 007b7a2..5f5de0e 100644
--- a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
+++ b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
@@ -19,22 +19,46 @@
package org.apache.druid.client;
+import com.google.common.base.Preconditions;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.SegmentDescriptor;
+import javax.annotation.Nullable;
+
/**
* Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query.
*
- * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data
+ * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data. Used
+ * by {@link org.apache.druid.server.LocalQuerySegmentWalker} on the broker for on broker queries
*/
public class SegmentServerSelector extends Pair<ServerSelector, SegmentDescriptor>
{
+ /**
+ * This is for a segment hosted on a remote server, where {@link ServerSelector} may be used to pick
+ * a {@link DruidServer} to query.
+ */
public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment)
{
super(server, segment);
+ Preconditions.checkNotNull(server, "ServerSelector must not be null");
+ Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
+ }
+
+ /**
+ * This is for a segment hosted locally
+ */
+ public SegmentServerSelector(SegmentDescriptor segment)
+ {
+ super(null, segment);
+ Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
}
+ /**
+ * This may be null if {@link SegmentDescriptor} is locally available, but will definitely not be null for segments
+ * which must be queried remotely (e.g. {@link CachingClusteredClient})
+ */
+ @Nullable
public ServerSelector getServer()
{
return lhs;
diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
index 0a01b8c..d7f39ad 100644
--- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
@@ -20,6 +20,7 @@
package org.apache.druid.server;
import com.google.inject.Inject;
+import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -27,6 +28,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -39,6 +41,8 @@ import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.joda.time.Interval;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
@@ -57,6 +61,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
private final QueryRunnerFactoryConglomerate conglomerate;
private final SegmentWrangler segmentWrangler;
private final JoinableFactory joinableFactory;
+ private final QueryScheduler scheduler;
private final ServiceEmitter emitter;
@Inject
@@ -64,12 +69,14 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
QueryRunnerFactoryConglomerate conglomerate,
SegmentWrangler segmentWrangler,
JoinableFactory joinableFactory,
+ QueryScheduler scheduler,
ServiceEmitter emitter
)
{
this.conglomerate = conglomerate;
this.segmentWrangler = segmentWrangler;
this.joinableFactory = joinableFactory;
+ this.scheduler = scheduler;
this.emitter = emitter;
}
@@ -82,21 +89,23 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource());
}
- final AtomicLong cpuAccumulator = new AtomicLong(0L);
- final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(query);
final Iterable<Segment> segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals);
+ final Query<T> prioritizedAndLaned = prioritizeAndLaneQuery(query, segments);
+
+ final AtomicLong cpuAccumulator = new AtomicLong(0L);
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuAccumulator,
- QueryContexts.getEnableJoinFilterPushDown(query),
- QueryContexts.getEnableJoinFilterRewrite(query),
- QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
- QueryContexts.getJoinFilterRewriteMaxSize(query),
- query.getFilter() == null ? null : query.getFilter().toFilter(),
- query.getVirtualColumns()
+ QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned),
+ QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned),
+ QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned),
+ QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned),
+ prioritizedAndLaned.getFilter() == null ? null : prioritizedAndLaned.getFilter().toFilter(),
+ prioritizedAndLaned.getVirtualColumns()
);
+ final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(prioritizedAndLaned);
final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
() -> StreamSupport.stream(segments.spliterator(), false)
@@ -107,17 +116,25 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
// Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where
// it is already supported.
return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest())
- .create(baseRunner)
+ .create(scheduler.wrapQueryRunner(baseRunner))
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter, cpuAccumulator);
}
-
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
// SegmentWranglers only work based on intervals and cannot run with specific segments.
throw new ISE("Cannot run with specific segments");
}
+
+ private <T> Query<T> prioritizeAndLaneQuery(Query<T> query, Iterable<Segment> segments)
+ {
+ Set<SegmentServerSelector> segmentServerSelectors = new HashSet<>();
+ for (Segment s : segments) {
+ segmentServerSelectors.add(new SegmentServerSelector(s.getId().toDescriptor()));
+ }
+ return scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), segmentServerSelectors);
+ }
}
diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index 86c9ec9..f50b50c 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -30,10 +30,12 @@ import io.github.resilience4j.bulkhead.BulkheadRegistry;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.server.initialization.ServerConfig;
@@ -141,6 +143,17 @@ public class QueryScheduler implements QueryWatcher
}
/**
+ * Returns a {@link QueryRunner} that will call {@link QueryScheduler#run} when {@link QueryRunner#run} is called.
+ */
+ public <T> QueryRunner<T> wrapQueryRunner(QueryRunner<T> baseRunner)
+ {
+ return (queryPlus, responseContext) ->
+ QueryScheduler.this.run(
+ queryPlus.getQuery(), new LazySequence<>(() -> baseRunner.run(queryPlus, responseContext))
+ );
+ }
+
+ /**
* Forcibly cancel all futures that have been registered to a specific query id
*/
public boolean cancelQuery(String id)
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index 779fd54..5303989 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -47,11 +47,8 @@ import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -335,7 +332,7 @@ public class CachingClusteredClientFunctionalityTest
}
},
ForkJoinPool.commonPool(),
- new QueryScheduler(0, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}
diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index 2215dcb..c98c776 100644
--- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -70,6 +70,8 @@ import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
+import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -151,12 +153,20 @@ public class ClientQuerySegmentWalkerTest
// version VERSION, and shard spec SHARD_SPEC.
private ClientQuerySegmentWalker walker;
+ private ObservableQueryScheduler scheduler;
+
@Before
public void setUp()
{
closer = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
- initWalker(ImmutableMap.of());
+ scheduler = new ObservableQueryScheduler(
+ 8,
+ ManualQueryPrioritizationStrategy.INSTANCE,
+ NoQueryLaningStrategy.INSTANCE,
+ new ServerConfig()
+ );
+ initWalker(ImmutableMap.of(), scheduler);
}
@After
@@ -182,6 +192,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.cluster(query)),
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
);
+
+ Assert.assertEquals(1, scheduler.getTotalRun().get());
+ Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@@ -200,6 +215,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.local(query)),
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
);
+
+ Assert.assertEquals(1, scheduler.getTotalRun().get());
+ Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@@ -236,6 +256,13 @@ public class ClientQuerySegmentWalkerTest
),
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
+
+ // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+ // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+ Assert.assertEquals(2, scheduler.getTotalRun().get());
+ Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@@ -263,6 +290,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.cluster(subquery)),
ImmutableList.of(new Object[]{3L})
);
+
+ Assert.assertEquals(1, scheduler.getTotalRun().get());
+ Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@@ -299,6 +331,13 @@ public class ClientQuerySegmentWalkerTest
new Object[]{"z", 1L}
)
);
+
+ // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+ // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+ Assert.assertEquals(2, scheduler.getTotalRun().get());
+ Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@@ -351,6 +390,13 @@ public class ClientQuerySegmentWalkerTest
),
ImmutableList.of(new Object[]{"y", "y", 1L})
);
+
+ // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+ // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+ Assert.assertEquals(2, scheduler.getTotalRun().get());
+ Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@@ -408,10 +454,18 @@ public class ClientQuerySegmentWalkerTest
}
/**
- * Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
+ * Initialize (or reinitialize) our {@link #walker} and {@link #closer} with default scheduler.
*/
private void initWalker(final Map<String, String> serverProperties)
{
+ initWalker(serverProperties, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
+ }
+
+ /**
+ * Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
+ */
+ private void initWalker(final Map<String, String> serverProperties, QueryScheduler schedulerForTest)
+ {
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final ServerConfig serverConfig = jsonMapper.convertValue(serverProperties, ServerConfig.class);
@@ -472,7 +526,7 @@ public class ClientQuerySegmentWalkerTest
),
joinableFactory,
conglomerate,
- null /* QueryScheduler */
+ schedulerForTest
),
ClusterOrLocal.CLUSTER
),
@@ -480,8 +534,9 @@ public class ClientQuerySegmentWalkerTest
QueryStackTests.createLocalQuerySegmentWalker(
conglomerate,
segmentWrangler,
- joinableFactory
- ),
+ joinableFactory,
+ schedulerForTest
+ ),
ClusterOrLocal.LOCAL
),
conglomerate,
diff --git a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java
new file mode 100644
index 0000000..638f5f2
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import org.apache.druid.client.SegmentServerSelector;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.server.initialization.ServerConfig;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link QueryScheduler} for testing, with counters on its internal functions so its operation can be observed
+ * and verified by tests
+ */
+public class ObservableQueryScheduler extends QueryScheduler
+{
+ private final AtomicLong totalAcquired;
+ private final AtomicLong totalReleased;
+ private final AtomicLong laneAcquired;
+ private final AtomicLong laneNotAcquired;
+ private final AtomicLong laneReleased;
+ private final AtomicLong totalPrioritizedAndLaned;
+ private final AtomicLong totalRun;
+
+ public ObservableQueryScheduler(
+ int totalNumThreads,
+ QueryPrioritizationStrategy prioritizationStrategy,
+ QueryLaningStrategy laningStrategy,
+ ServerConfig serverConfig
+ )
+ {
+ super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig);
+
+ totalAcquired = new AtomicLong();
+ totalReleased = new AtomicLong();
+ laneAcquired = new AtomicLong();
+ laneNotAcquired = new AtomicLong();
+ laneReleased = new AtomicLong();
+ totalPrioritizedAndLaned = new AtomicLong();
+ totalRun = new AtomicLong();
+ }
+
+ @Override
+ public <T> Sequence<T> run(
+ Query<?> query,
+ Sequence<T> resultSequence
+ )
+ {
+ return super.run(query, resultSequence).withBaggage(totalRun::incrementAndGet);
+ }
+
+ @Override
+ public <T> Query<T> prioritizeAndLaneQuery(
+ QueryPlus<T> queryPlus,
+ Set<SegmentServerSelector> segments
+ )
+ {
+ totalPrioritizedAndLaned.incrementAndGet();
+ return super.prioritizeAndLaneQuery(queryPlus, segments);
+ }
+
+ @Override
+ List<Bulkhead> acquireLanes(Query<?> query)
+ {
+ List<Bulkhead> bulkheads = super.acquireLanes(query);
+ if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+ totalAcquired.incrementAndGet();
+ }
+ if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+ laneAcquired.incrementAndGet();
+ }
+
+ return bulkheads;
+ }
+
+ @Override
+ void releaseLanes(List<Bulkhead> bulkheads)
+ {
+ super.releaseLanes(bulkheads);
+ if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+ totalReleased.incrementAndGet();
+ }
+ if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+ laneReleased.incrementAndGet();
+ if (bulkheads.size() == 1) {
+ laneNotAcquired.incrementAndGet();
+ }
+ }
+ }
+
+ @Override
+ void finishLanes(List<Bulkhead> bulkheads)
+ {
+ super.finishLanes(bulkheads);
+ if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+ totalReleased.incrementAndGet();
+ }
+ if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+ laneReleased.incrementAndGet();
+ }
+ }
+
+ /**
+ * Number of times that 'total' query count semaphore was acquired
+ */
+ public AtomicLong getTotalAcquired()
+ {
+ return totalAcquired;
+ }
+
+ /**
+ * Number of times that 'total' query count semaphore was released
+ */
+ public AtomicLong getTotalReleased()
+ {
+ return totalReleased;
+ }
+
+ /**
+ * Number of times that the query count semaphore of any lane was acquired
+ */
+ public AtomicLong getLaneAcquired()
+ {
+ return laneAcquired;
+ }
+
+ /**
+ * Number of times that the query count semaphore of any lane was acquired but the 'total' semaphore was NOT acquired
+ */
+ public AtomicLong getLaneNotAcquired()
+ {
+ return laneNotAcquired;
+ }
+
+ /**
+ * Number of times that the query count semaphore of any lane was released
+ */
+ public AtomicLong getLaneReleased()
+ {
+ return laneReleased;
+ }
+
+ /**
+ * Number of times that {@link QueryScheduler#prioritizeAndLaneQuery} was called
+ */
+ public AtomicLong getTotalPrioritizedAndLaned()
+ {
+ return totalPrioritizedAndLaned;
+ }
+
+ /**
+ * Number of times that {@link QueryScheduler#run} was called
+ */
+ public AtomicLong getTotalRun()
+ {
+ return totalRun;
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index d17a372..5bda433 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -174,12 +174,7 @@ public class QueryResourceTest
EasyMock.expect(testServletRequest.getHeader("Accept")).andReturn(MediaType.APPLICATION_JSON).anyTimes();
EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes();
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
- queryScheduler = new QueryScheduler(
- 8,
- ManualQueryPrioritizationStrategy.INSTANCE,
- NoQueryLaningStrategy.INSTANCE,
- new ServerConfig()
- );
+ queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER;
testRequestLogger = new TestRequestLogger();
queryResource = new QueryResource(
new QueryLifecycleFactory(
diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 177be82..485275b 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -30,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.ProvisionException;
-import io.github.resilience4j.bulkhead.Bulkhead;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
@@ -72,7 +71,6 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
public class QuerySchedulerTest
{
@@ -708,95 +706,4 @@ public class QuerySchedulerTest
);
return injector;
}
-
- private static class ObservableQueryScheduler extends QueryScheduler
- {
- private final AtomicLong totalAcquired;
- private final AtomicLong totalReleased;
- private final AtomicLong laneAcquired;
- private final AtomicLong laneNotAcquired;
- private final AtomicLong laneReleased;
-
- public ObservableQueryScheduler(
- int totalNumThreads,
- QueryPrioritizationStrategy prioritizationStrategy,
- QueryLaningStrategy laningStrategy,
- ServerConfig serverConfig
- )
- {
- super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig);
-
- totalAcquired = new AtomicLong();
- totalReleased = new AtomicLong();
- laneAcquired = new AtomicLong();
- laneNotAcquired = new AtomicLong();
- laneReleased = new AtomicLong();
- }
-
- @Override
- List<Bulkhead> acquireLanes(Query<?> query)
- {
- List<Bulkhead> bulkheads = super.acquireLanes(query);
- if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
- totalAcquired.incrementAndGet();
- }
- if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
- laneAcquired.incrementAndGet();
- }
-
- return bulkheads;
- }
-
- @Override
- void releaseLanes(List<Bulkhead> bulkheads)
- {
- super.releaseLanes(bulkheads);
- if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
- totalReleased.incrementAndGet();
- }
- if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
- laneReleased.incrementAndGet();
- if (bulkheads.size() == 1) {
- laneNotAcquired.incrementAndGet();
- }
- }
- }
-
- @Override
- void finishLanes(List<Bulkhead> bulkheads)
- {
- super.finishLanes(bulkheads);
- if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
- totalReleased.incrementAndGet();
- }
- if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
- laneReleased.incrementAndGet();
- }
- }
-
- public AtomicLong getTotalAcquired()
- {
- return totalAcquired;
- }
-
- public AtomicLong getTotalReleased()
- {
- return totalReleased;
- }
-
- public AtomicLong getLaneAcquired()
- {
- return laneAcquired;
- }
-
- public AtomicLong getLaneNotAcquired()
- {
- return laneNotAcquired;
- }
-
- public AtomicLong getLaneReleased()
- {
- return laneReleased;
- }
- }
}
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index a07e14e..1370ed7 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -64,6 +64,8 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
+import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import javax.annotation.Nullable;
@@ -75,6 +77,12 @@ import java.util.Map;
*/
public class QueryStackTests
{
+ public static final QueryScheduler DEFAULT_NOOP_SCHEDULER = new QueryScheduler(
+ 0,
+ ManualQueryPrioritizationStrategy.INSTANCE,
+ NoQueryLaningStrategy.INSTANCE,
+ new ServerConfig()
+ );
private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
@@ -148,10 +156,17 @@ public class QueryStackTests
public static LocalQuerySegmentWalker createLocalQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final SegmentWrangler segmentWrangler,
- final JoinableFactory joinableFactory
+ final JoinableFactory joinableFactory,
+ final QueryScheduler scheduler
)
{
- return new LocalQuerySegmentWalker(conglomerate, segmentWrangler, joinableFactory, EMITTER);
+ return new LocalQuerySegmentWalker(
+ conglomerate,
+ segmentWrangler,
+ joinableFactory,
+ scheduler,
+ EMITTER
+ );
}
/**
@@ -255,4 +270,5 @@ public class QueryStackTests
return conglomerate;
}
+
}
diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
index 7fef5a9..cc3a406 100644
--- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
+++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
@@ -165,11 +165,13 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
// Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
// This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
- // to function properly.
+ // to function properly. SegmentServerSelector does not currently mimic CachingClusteredClient, it is using
+ // the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects
+ // to actually serve the queries
return (theQuery, responseContext) -> {
if (scheduler != null) {
Set<SegmentServerSelector> segments = new HashSet<>();
- specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec)));
+ specs.forEach(spec -> segments.add(new SegmentServerSelector(spec)));
return scheduler.run(
scheduler.prioritizeAndLaneQuery(theQuery, segments),
new LazySequence<>(
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 5d446c5..5ae1390 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -77,6 +77,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access;
@@ -572,13 +573,13 @@ public class CalciteTests
final File tmpDir
)
{
- return createMockWalker(conglomerate, tmpDir, null);
+ return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final File tmpDir,
- @Nullable final QueryScheduler scheduler
+ final QueryScheduler scheduler
)
{
final QueryableIndex index1 = IndexBuilder
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 50134ff..1da0fba 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -85,7 +85,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable final JoinableFactory joinableFactory,
- @Nullable final QueryScheduler scheduler
+ final QueryScheduler scheduler
)
{
final JoinableFactory joinableFactoryToUse;
@@ -116,7 +116,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
.put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider))
.build()
),
- joinableFactoryToUse
+ joinableFactoryToUse,
+ scheduler
),
conglomerate,
new ServerConfig()
@@ -146,7 +147,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
}
},
null,
- null
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org