You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/08/17 01:24:03 UTC
[druid] branch master updated: deprecate config-magic in favor of json configuration stuff (#14695)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6b14dde50e deprecate config-magic in favor of json configuration stuff (#14695)
6b14dde50e is described below
commit 6b14dde50ef82d3ba8faddc5bc9517e4ae3c461c
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Wed Aug 16 18:23:57 2023 -0700
deprecate config-magic in favor of json configuration stuff (#14695)
* json config based processing and broker merge configs to deprecate config-magic
---
.../query/CachingClusteredClientBenchmark.java | 17 +-
.../benchmark/query/SqlExpressionBenchmark.java | 7 -
.../benchmark/query/SqlNestedDataBenchmark.java | 8 +-
.../movingaverage/MovingAverageQueryTest.java | 13 +-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 2 +-
.../kafka/supervisor/KafkaSupervisorSpec.java | 2 +-
.../kafka/supervisor/KafkaSupervisorSpecTest.java | 2 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 2 +-
.../kinesis/supervisor/KinesisSupervisorSpec.java | 2 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 2 +-
.../supervisor/SeekableStreamSupervisor.java | 4 +-
.../supervisor/SeekableStreamSupervisorSpec.java | 2 +-
.../SeekableStreamSupervisorSpecTest.java | 2 +-
.../SeekableStreamSupervisorStateTest.java | 34 ++-
.../apache/druid/testsEx/config/Initializer.java | 4 +-
.../org/apache/druid/guice/ConfigProvider.java | 6 +-
.../common/concurrent/ExecutorServiceConfig.java | 57 -----
.../util/metrics/AllocationMetricCollector.java | 2 +-
.../java/util/metrics/BasicMonitorScheduler.java | 4 +-
.../metrics/ClockDriftSafeMonitorScheduler.java | 4 +-
.../util}/metrics/DruidMonitorSchedulerConfig.java | 15 +-
.../druid/java/util/metrics/MonitorScheduler.java | 6 +-
.../java/util/metrics/MonitorSchedulerConfig.java | 33 ---
.../druid/query/DruidProcessingBufferConfig.java | 78 +++++++
.../apache/druid/query/DruidProcessingConfig.java | 232 +++++++++------------
.../druid/query/DruidProcessingIndexesConfig.java | 67 ++++++
.../aggregation/first/StringFirstLastUtils.java | 1 +
.../java/org/apache/druid/segment/IndexIO.java | 5 +
.../apache/druid/segment/column/ColumnConfig.java | 20 +-
.../util/metrics/BasicMonitorSchedulerTest.java | 4 +-
.../ClockDriftSafeMonitorSchedulerTest.java | 18 +-
.../java/util/metrics/MonitorSchedulerTest.java | 4 +-
.../druid/query/DruidProcessingConfigTest.java | 98 ++++-----
.../druid/client/CachingClusteredClient.java | 22 +-
.../apache/druid/guice/BrokerProcessingModule.java | 13 +-
.../apache/druid/guice/DruidProcessingModule.java | 26 +--
... => LegacyBrokerParallelMergeConfigModule.java} | 15 +-
.../apache/druid/guice/RouterProcessingModule.java | 15 +-
.../org/apache/druid/guice/StorageNodeModule.java | 1 -
.../druid/initialization/CoreInjectorBuilder.java | 2 -
.../druid/query/BrokerParallelMergeConfig.java | 215 +++++++++++++++++++
.../query/LegacyBrokerParallelMergeConfig.java | 74 +++++++
.../segment/realtime/DbSegmentPublisherConfig.java | 28 ---
.../apache/druid/server/metrics/MetricsModule.java | 1 +
.../CachingClusteredClientFunctionalityTest.java | 19 +-
.../client/CachingClusteredClientPerfTest.java | 6 +-
.../druid/client/CachingClusteredClientTest.java | 21 +-
.../druid/guice/BrokerProcessingModuleTest.java | 95 +++++----
.../apache/druid/guice/QueryableModuleTest.java | 2 +-
.../QueryRunnerBasedOnClusteredClientTestBase.java | 8 +-
.../org/apache/druid/server/QueryStackTests.java | 39 ++--
.../main/java/org/apache/druid/cli/CliBroker.java | 2 +
.../java/org/apache/druid/cli/DumpSegment.java | 40 +---
.../org/apache/druid/cli/ValidateSegments.java | 40 +---
.../java/org/apache/druid/cli/DumpSegmentTest.java | 20 ++
.../org/apache/druid/cli/ValidateSegmentsTest.java | 117 +++++++++++
.../druid/sql/calcite/util/SqlTestFramework.java | 2 +-
57 files changed, 952 insertions(+), 628 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index 6c9c6aa3d4..e38ee8862d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
@@ -103,7 +104,6 @@ import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -258,12 +258,6 @@ public class CachingClusteredClientBenchmark
{
return numProcessingThreads;
}
-
- @Override
- public boolean useParallelMergePool()
- {
- return true;
- }
};
conglomerate = new DefaultQueryRunnerFactoryConglomerate(
@@ -339,10 +333,15 @@ public class CachingClusteredClientBenchmark
new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig(),
- processingConfig,
+ new BrokerParallelMergeConfig() {
+ @Override
+ public boolean useParallelMergePool()
+ {
+ return true;
+ }
+ },
forkJoinPool,
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
- JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
index 7733281908..80d208c5a4 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
@@ -106,13 +106,6 @@ public class SqlExpressionBenchmark
{
return 1;
}
-
- @Override
- public boolean useParallelMergePoolConfigured()
- {
- return true;
- }
-
@Override
public String getFormatString()
{
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
index 98514512e9..da42aaeefc 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
@@ -112,13 +112,7 @@ public class SqlNestedDataBenchmark
{
return 1;
}
-
- @Override
- public boolean useParallelMergePoolConfigured()
- {
- return true;
- }
-
+
@Override
public String getFormatString()
{
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 2945bdb1e5..223048200f 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -54,7 +54,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@@ -69,7 +69,6 @@ import org.apache.druid.query.movingaverage.test.TestConfig;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
@@ -363,17 +362,9 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
return 0L;
}
},
- new DruidProcessingConfig()
- {
- @Override
- public String getFormatString()
- {
- return null;
- }
- },
+ new BrokerParallelMergeConfig(),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
- new JoinableFactoryWrapper(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())),
new NoopServiceEmitter()
);
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index a146e090b5..e9c6b835fd 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -55,8 +55,8 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 9337175378..5d84eeed1f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -33,9 +33,9 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 9ff0c0bd78..f8feeae9be 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -30,10 +30,10 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 5f1311085c..2ff8c5bf91 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -80,6 +80,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
@@ -89,7 +90,6 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index eb9715f9c9..4aa60ea032 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -36,9 +36,9 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index e489061408..127683f8e4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -76,6 +76,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -85,7 +86,6 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index a72ba05eac..f8b41fce61 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4077,13 +4077,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
reportingExec.scheduleAtFixedRate(
this::emitLag,
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
- spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+ spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
this::emitNoticesQueueSize,
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
- spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+ spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 90b0b70563..b7d4ba2f27 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -35,9 +35,9 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFac
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import javax.annotation.Nullable;
import java.util.List;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 05e6401a8f..f0b17657d8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -55,13 +55,13 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index c592b9375f..46f86bacb4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -77,7 +78,6 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@@ -102,6 +102,7 @@ import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -501,7 +502,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
@Override
- public Duration getEmitterPeriod()
+ public Duration getEmissionDuration()
{
return new Period("PT1S").toStandardDuration();
}
@@ -1062,6 +1063,29 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
Assert.assertEquals(0, emitter.getEvents().size());
}
+ @Test
+ public void testScheduleReporting()
+ {
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig();
+ EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(2);
+ ScheduledExecutorService executorService = EasyMock.createMock(ScheduledExecutorService.class);
+ EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once();
+ EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2);
+
+ EasyMock.replay(executorService, spec);
+ final BaseTestSeekableStreamSupervisor supervisor = new BaseTestSeekableStreamSupervisor()
+ {
+ @Override
+ public LagStats computeLagStats()
+ {
+ return new LagStats(0, 0, 0);
+ }
+ };
+ supervisor.scheduleReporting(executorService);
+ EasyMock.verify(executorService, spec);
+ }
+
private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
{
List<Event> result = events.stream()
@@ -1098,7 +1122,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
@Override
- public Duration getEmitterPeriod()
+ public Duration getEmissionDuration()
{
return new Period("PT1S").toStandardDuration();
}
@@ -1608,13 +1632,13 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
reportingExec.scheduleAtFixedRate(
this::emitLag,
ioConfig.getStartDelay().getMillis(),
- spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+ spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
this::emitNoticesQueueSize,
ioConfig.getStartDelay().getMillis(),
- spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+ spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
index 7a2eae93e1..7931115c6e 100644
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
@@ -35,9 +35,9 @@ import org.apache.druid.curator.discovery.DiscoveryModule;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.AnnouncerModule;
-import org.apache.druid.guice.DruidProcessingConfigModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.SQLMetadataStorageDruidModule;
@@ -496,7 +496,7 @@ public class Initializer
new AnnouncerModule(),
new DiscoveryModule(),
// Dependencies from other modules
- new DruidProcessingConfigModule(),
+ new LegacyBrokerParallelMergeConfigModule(),
// Dependencies from other modules
new StorageNodeModule(),
diff --git a/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java b/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java
index 538f2a6137..66cc6261c9 100644
--- a/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java
+++ b/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java
@@ -31,6 +31,7 @@ import java.util.Map;
/**
*
*/
+@Deprecated
public class ConfigProvider<T> implements Provider<T>
{
private static final Logger log = new Logger(ConfigProvider.class);
@@ -40,11 +41,6 @@ public class ConfigProvider<T> implements Provider<T>
binder.bind(clazz).toProvider(of(clazz)).in(LazySingleton.class);
}
- public static <T> void bind(Binder binder, Class<T> clazz, Map<String, String> replacements)
- {
- binder.bind(clazz).toProvider(of(clazz, replacements)).in(LazySingleton.class);
- }
-
public static <T> Provider<T> of(Class<T> clazz)
{
return of(clazz, null);
diff --git a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java
deleted file mode 100644
index 6d6327fbdd..0000000000
--- a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.java.util.common.concurrent;
-
-import org.apache.druid.utils.JvmUtils;
-import org.skife.config.Config;
-import org.skife.config.Default;
-
-/**
- */
-public abstract class ExecutorServiceConfig
-{
- public static final int DEFAULT_NUM_THREADS = -1;
-
- @Config(value = "${base_path}.formatString")
- @Default("processing-%s")
- public abstract String getFormatString();
-
- public int getNumThreads()
- {
- int numThreadsConfigured = getNumThreadsConfigured();
- if (numThreadsConfigured != DEFAULT_NUM_THREADS) {
- return numThreadsConfigured;
- } else {
- return Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1);
- }
- }
-
- /**
- * Returns the number of threads _explicitly_ configured, or -1 if it is not explicitly configured, that is not
- * a valid number of threads. To get the configured value or the default (valid) number, use {@link #getNumThreads()}.
- * This method exists for ability to distinguish between the default value set when there is no explicit config, and
- * an explicitly configured value.
- */
- @Config(value = "${base_path}.numThreads")
- public int getNumThreadsConfigured()
- {
- return DEFAULT_NUM_THREADS;
- }
-}
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java b/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java
index a0a9d7f645..1ccb910562 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java
@@ -52,7 +52,7 @@ class AllocationMetricCollector
* Tests show the call to getThreadAllocatedBytes for a single thread ID out of 500 threads running takes around
* 9000 ns (in the worst case), which for 500 IDs should take 500*9000/1000/1000 = 4.5 ms to the max.
* AllocationMetricCollector takes linear time to calculate delta, for 500 threads it's negligible.
- * See the default emitting period {@link MonitorSchedulerConfig#getEmitterPeriod}.
+ * See the default emitting period {@link DruidMonitorSchedulerConfig#getEmissionDuration}.
*
* @return all threads summed allocated bytes delta
*/
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
index 0d5c07d49b..c57bbc0dd1 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
@@ -34,7 +34,7 @@ public class BasicMonitorScheduler extends MonitorScheduler
private final ScheduledExecutorService exec;
public BasicMonitorScheduler(
- MonitorSchedulerConfig config,
+ DruidMonitorSchedulerConfig config,
ServiceEmitter emitter,
List<Monitor> monitors,
ScheduledExecutorService exec
@@ -50,7 +50,7 @@ public class BasicMonitorScheduler extends MonitorScheduler
monitor.start();
ScheduledExecutors.scheduleAtFixedRate(
exec,
- getConfig().getEmitterPeriod(),
+ getConfig().getEmissionDuration(),
() -> {
if (hasMonitor(monitor) && monitor.monitor(getEmitter())) {
return Signal.REPEAT;
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
index 6fcb244069..68b7bacedd 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
@@ -41,7 +41,7 @@ public class ClockDriftSafeMonitorScheduler extends MonitorScheduler
private final ExecutorService monitorRunner;
public ClockDriftSafeMonitorScheduler(
- MonitorSchedulerConfig config,
+ DruidMonitorSchedulerConfig config,
ServiceEmitter emitter,
List<Monitor> monitors,
CronScheduler monitorScheduler,
@@ -57,7 +57,7 @@ public class ClockDriftSafeMonitorScheduler extends MonitorScheduler
void startMonitor(final Monitor monitor)
{
monitor.start();
- long rate = getConfig().getEmitterPeriod().getMillis();
+ long rate = getConfig().getEmissionDuration().getMillis();
final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
Future<?> future = monitorScheduler.scheduleAtFixedRate(
rate,
diff --git a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java b/processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java
similarity index 76%
rename from server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
rename to processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java
index 0e242b1244..96a554e6ef 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java
@@ -17,17 +17,15 @@
* under the License.
*/
-package org.apache.druid.server.metrics;
+package org.apache.druid.java.util.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
-import org.apache.druid.java.util.metrics.MonitorSchedulerConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
/**
*/
-public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig
+public class DruidMonitorSchedulerConfig
{
@JsonProperty
private String schedulerClassName = BasicMonitorScheduler.class.getName();
@@ -40,14 +38,7 @@ public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig
return schedulerClassName;
}
- @JsonProperty
- public Period getEmissionPeriod()
- {
- return emissionPeriod;
- }
-
- @Override
- public Duration getEmitterPeriod()
+ public Duration getEmissionDuration()
{
return emissionPeriod.toStandardDuration();
}
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 9e159182a1..47bb37d161 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -39,7 +39,7 @@ public abstract class MonitorScheduler
{
private static final Logger log = new Logger(MonitorScheduler.class);
- private final MonitorSchedulerConfig config;
+ private final DruidMonitorSchedulerConfig config;
private final ServiceEmitter emitter;
private final Set<Monitor> monitors;
private final Object lock = new Object();
@@ -47,7 +47,7 @@ public abstract class MonitorScheduler
private volatile boolean started = false;
MonitorScheduler(
- MonitorSchedulerConfig config,
+ DruidMonitorSchedulerConfig config,
ServiceEmitter emitter,
List<Monitor> monitors
)
@@ -135,7 +135,7 @@ public abstract class MonitorScheduler
}
}
- MonitorSchedulerConfig getConfig()
+ DruidMonitorSchedulerConfig getConfig()
{
return config;
}
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java
deleted file mode 100644
index 8d0e165e2a..0000000000
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.java.util.metrics;
-
-import org.joda.time.Duration;
-import org.skife.config.Config;
-import org.skife.config.Default;
-
-/**
- */
-public abstract class MonitorSchedulerConfig
-{
- @Config({"org.apache.druid.java.util.metrics.emitter.period", "com.metamx.druid.emitter.period"})
- @Default("PT60s")
- public abstract Duration getEmitterPeriod();
-}
diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java
new file mode 100644
index 0000000000..ac6b63c8ad
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+
+import javax.annotation.Nullable;
+
+public class DruidProcessingBufferConfig
+{
+ public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1);
+ public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024;
+ public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0;
+
+ @JsonProperty
+ private final HumanReadableBytes sizeBytes;
+
+ @JsonProperty
+ private final int poolCacheMaxCount;
+
+ @JsonProperty
+ private final int poolCacheInitialCount;
+
+ @JsonCreator
+ public DruidProcessingBufferConfig(
+ @JsonProperty("sizeBytes") @Nullable HumanReadableBytes sizeBytes,
+ @JsonProperty("poolCacheMaxCount") @Nullable Integer poolCacheMaxCount,
+ @JsonProperty("poolCacheInitialCount") @Nullable Integer poolCacheInitialCount
+ )
+ {
+ this.sizeBytes = Configs.valueOrDefault(sizeBytes, DEFAULT_PROCESSING_BUFFER_SIZE_BYTES);
+ this.poolCacheInitialCount = Configs.valueOrDefault(
+ poolCacheInitialCount,
+ DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL
+ );
+ this.poolCacheMaxCount = Configs.valueOrDefault(poolCacheMaxCount, Integer.MAX_VALUE);
+ }
+
+ public DruidProcessingBufferConfig()
+ {
+ this(null, null, null);
+ }
+
+ public HumanReadableBytes getBufferSize()
+ {
+ return sizeBytes;
+ }
+
+ public int getPoolCacheMaxCount()
+ {
+ return poolCacheMaxCount;
+ }
+
+ public int getPoolCacheInitialCount()
+ {
+ return poolCacheInitialCount;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
index 823c9e71ef..ca645d16e9 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
@@ -19,45 +19,82 @@
package org.apache.druid.query;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
-import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.utils.JvmUtils;
-import org.skife.config.Config;
+import javax.annotation.Nullable;
import java.util.concurrent.atomic.AtomicReference;
-public abstract class DruidProcessingConfig extends ExecutorServiceConfig implements ColumnConfig
+public class DruidProcessingConfig implements ColumnConfig
{
private static final Logger log = new Logger(DruidProcessingConfig.class);
- public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
- public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1);
- public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024;
- public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000;
- public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0;
-
- private AtomicReference<Integer> computedBufferSizeBytes = new AtomicReference<>();
-
- @Config({"druid.computation.buffer.size", "${base_path}.buffer.sizeBytes"})
- public HumanReadableBytes intermediateComputeSizeBytesConfigured()
- {
- return DEFAULT_PROCESSING_BUFFER_SIZE_BYTES;
- }
-
- public int intermediateComputeSizeBytes()
- {
- HumanReadableBytes sizeBytesConfigured = intermediateComputeSizeBytesConfigured();
- if (!DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured)) {
+ @JsonProperty
+ private final String formatString;
+ @JsonProperty
+ private final int numThreads;
+ @JsonProperty
+ private final int numMergeBuffers;
+ @JsonProperty
+ private final boolean fifo;
+ @JsonProperty
+ private final String tmpDir;
+ @JsonProperty
+ private final DruidProcessingBufferConfig buffer;
+ @JsonProperty
+ private final DruidProcessingIndexesConfig indexes;
+ private final AtomicReference<Integer> computedBufferSizeBytes = new AtomicReference<>();
+ private final boolean numThreadsConfigured;
+ private final boolean numMergeBuffersConfigured;
+
+ @JsonCreator
+ public DruidProcessingConfig(
+ @JsonProperty("formatString") @Nullable String formatString,
+ @JsonProperty("numThreads") @Nullable Integer numThreads,
+ @JsonProperty("numMergeBuffers") @Nullable Integer numMergeBuffers,
+ @JsonProperty("fifo") @Nullable Boolean fifo,
+ @JsonProperty("tmpDir") @Nullable String tmpDir,
+ @JsonProperty("buffer") DruidProcessingBufferConfig buffer,
+ @JsonProperty("indexes") DruidProcessingIndexesConfig indexes
+ )
+ {
+ this.formatString = Configs.valueOrDefault(formatString, "processing-%s");
+ this.numThreads = Configs.valueOrDefault(
+ numThreads,
+ Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1)
+ );
+ this.numMergeBuffers = Configs.valueOrDefault(numMergeBuffers, Math.max(2, this.numThreads / 4));
+ this.fifo = fifo == null || fifo;
+ this.tmpDir = Configs.valueOrDefault(tmpDir, System.getProperty("java.io.tmpdir"));
+ this.buffer = Configs.valueOrDefault(buffer, new DruidProcessingBufferConfig());
+ this.indexes = Configs.valueOrDefault(indexes, new DruidProcessingIndexesConfig());
+
+ this.numThreadsConfigured = numThreads != null;
+ this.numMergeBuffersConfigured = numMergeBuffers != null;
+ initializeBufferSize();
+ }
+
+ @VisibleForTesting
+ public DruidProcessingConfig()
+ {
+ this(null, null, null, null, null, null, null);
+ }
+
+ private void initializeBufferSize()
+ {
+ HumanReadableBytes sizeBytesConfigured = this.buffer.getBufferSize();
+ if (!DruidProcessingBufferConfig.DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured)) {
if (sizeBytesConfigured.getBytes() > Integer.MAX_VALUE) {
throw new IAE("druid.processing.buffer.sizeBytes must be less than 2GiB");
}
- return sizeBytesConfigured.getBytesInInt();
- } else if (computedBufferSizeBytes.get() != null) {
- return computedBufferSizeBytes.get();
+ computedBufferSizeBytes.set(sizeBytesConfigured.getBytesInInt());
}
long directSizeBytes;
@@ -70,173 +107,90 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
}
catch (UnsupportedOperationException e) {
// max direct memory defaults to max heap size on recent JDK version, unless set explicitly
- directSizeBytes = computeMaxMemoryFromMaxHeapSize();
+ directSizeBytes = Runtime.getRuntime().maxMemory() / 4;
log.info("Using up to [%,d] bytes of direct memory for computation buffers.", directSizeBytes);
}
- int numProcessingThreads = getNumThreads();
- int numMergeBuffers = getNumMergeBuffers();
- int totalNumBuffers = numMergeBuffers + numProcessingThreads;
+ int totalNumBuffers = this.numMergeBuffers + this.numThreads;
int sizePerBuffer = (int) ((double) directSizeBytes / (double) (totalNumBuffers + 1));
- final int computedSizePerBuffer = Math.min(sizePerBuffer, MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES);
+ final int computedSizePerBuffer = Math.min(
+ sizePerBuffer,
+ DruidProcessingBufferConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES
+ );
if (computedBufferSizeBytes.compareAndSet(null, computedSizePerBuffer)) {
log.info(
"Auto sizing buffers to [%,d] bytes each for [%,d] processing and [%,d] merge buffers. "
+ "If you run out of direct memory, you may need to set these parameters explicitly using the guidelines at "
+ "https://druid.apache.org/docs/latest/operations/basic-cluster-tuning.html#processing-threads-buffers.",
computedSizePerBuffer,
- numProcessingThreads,
- numMergeBuffers
+ this.numThreads,
+ this.numMergeBuffers
);
}
- return computedSizePerBuffer;
- }
-
- public static long computeMaxMemoryFromMaxHeapSize()
- {
- return Runtime.getRuntime().maxMemory() / 4;
}
- @Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"})
- public int poolCacheMaxCount()
+ public String getFormatString()
{
- return Integer.MAX_VALUE;
+ return formatString;
}
- @Config({
- "druid.computation.buffer.poolCacheInitialCount",
- "${base_path}.buffer.poolCacheInitialCount"
- })
- public int getNumInitalBuffersForIntermediatePool()
+ public int getNumThreads()
{
- return DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL;
- }
-
- @Override
- @Config(value = "${base_path}.numThreads")
- public int getNumThreadsConfigured()
- {
- return DEFAULT_NUM_THREADS;
+ return numThreads;
}
public int getNumMergeBuffers()
{
- int numMergeBuffersConfigured = getNumMergeBuffersConfigured();
- if (numMergeBuffersConfigured != DEFAULT_NUM_MERGE_BUFFERS) {
- return numMergeBuffersConfigured;
- } else {
- return Math.max(2, getNumThreads() / 4);
- }
- }
-
- /**
- * Returns the number of merge buffers _explicitly_ configured, or -1 if it is not explicitly configured, that is not
- * a valid number of buffers. To get the configured value or the default (valid) number, use {@link
- * #getNumMergeBuffers()}. This method exists for ability to distinguish between the default value set when there is
- * no explicit config, and an explicitly configured value.
- */
- @Config("${base_path}.numMergeBuffers")
- public int getNumMergeBuffersConfigured()
- {
- return DEFAULT_NUM_MERGE_BUFFERS;
- }
-
- @Override
- @Config(value = "${base_path}.indexes.skipValueRangeIndexScale")
- public double skipValueRangeIndexScale()
- {
- return ColumnConfig.super.skipValueRangeIndexScale();
- }
-
- @Override
- @Config(value = "${base_path}.indexes.skipValuePredicateIndexScale")
- public double skipValuePredicateIndexScale()
- {
- return ColumnConfig.super.skipValuePredicateIndexScale();
+ return numMergeBuffers;
}
- @Config(value = "${base_path}.fifo")
public boolean isFifo()
{
- return true;
+ return fifo;
}
- @Config(value = "${base_path}.tmpDir")
public String getTmpDir()
{
- return System.getProperty("java.io.tmpdir");
+ return tmpDir;
}
- @Config(value = "${base_path}.merge.useParallelMergePool")
- public boolean useParallelMergePoolConfigured()
+ public int intermediateComputeSizeBytes()
{
- return true;
- }
- public boolean useParallelMergePool()
- {
- final boolean useParallelMergePoolConfigured = useParallelMergePoolConfigured();
- final int parallelism = getMergePoolParallelism();
- // need at least 3 to do 2 layer merge
- if (parallelism > 2) {
- return useParallelMergePoolConfigured;
- }
- if (useParallelMergePoolConfigured) {
- log.debug(
- "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s",
- parallelism
- );
- }
- return false;
- }
-
- @Config(value = "${base_path}.merge.pool.parallelism")
- public int getMergePoolParallelismConfigured()
- {
- return DEFAULT_NUM_THREADS;
+ return computedBufferSizeBytes.get();
}
- public int getMergePoolParallelism()
+ public int poolCacheMaxCount()
{
- int poolParallelismConfigured = getMergePoolParallelismConfigured();
- if (poolParallelismConfigured != DEFAULT_NUM_THREADS) {
- return poolParallelismConfigured;
- } else {
- // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5
- return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75);
- }
+ return buffer.getPoolCacheMaxCount();
}
- @Config(value = "${base_path}.merge.pool.awaitShutdownMillis")
- public long getMergePoolAwaitShutdownMillis()
+ public int getNumInitalBuffersForIntermediatePool()
{
- return DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS;
+ return buffer.getPoolCacheInitialCount();
}
- @Config(value = "${base_path}.merge.pool.defaultMaxQueryParallelism")
- public int getMergePoolDefaultMaxQueryParallelism()
+ @Override
+ public double skipValueRangeIndexScale()
{
- // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores
- return (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1);
+ return indexes.getSkipValueRangeIndexScale();
}
- @Config(value = "${base_path}.merge.task.targetRunTimeMillis")
- public int getMergePoolTargetTaskRunTimeMillis()
+ @Override
+ public double skipValuePredicateIndexScale()
{
- return ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS;
+ return indexes.getSkipValuePredicateIndexScale();
}
- @Config(value = "${base_path}.merge.task.initialYieldNumRows")
- public int getMergePoolTaskInitialYieldRows()
+ public boolean isNumThreadsConfigured()
{
- return ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS;
+ return numThreadsConfigured;
}
- @Config(value = "${base_path}.merge.task.smallBatchNumRows")
- public int getMergePoolSmallBatchRows()
+ public boolean isNumMergeBuffersConfigured()
{
- return ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS;
+ return numMergeBuffersConfigured;
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java
new file mode 100644
index 0000000000..6afa20418e
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.segment.column.ColumnConfig;
+
+import javax.annotation.Nullable;
+
+public class DruidProcessingIndexesConfig
+{
+ @JsonProperty
+ private final double skipValueRangeIndexScale;
+
+ @JsonProperty
+ private final double skipValuePredicateIndexScale;
+
+ @JsonCreator
+ public DruidProcessingIndexesConfig(
+ @JsonProperty("skipValueRangeIndexScale") @Nullable Double skipValueRangeIndexScale,
+ @JsonProperty("skipValuePredicateIndexScale") @Nullable Double skipValuePredicateIndexScale
+ )
+ {
+ this.skipValueRangeIndexScale = Configs.valueOrDefault(
+ skipValueRangeIndexScale,
+ ColumnConfig.DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE
+ );
+ this.skipValuePredicateIndexScale = Configs.valueOrDefault(
+ skipValuePredicateIndexScale,
+ ColumnConfig.DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE
+ );
+ }
+
+ public DruidProcessingIndexesConfig()
+ {
+ this(null, null);
+ }
+
+ public double getSkipValueRangeIndexScale()
+ {
+ return skipValueRangeIndexScale;
+ }
+
+ public double getSkipValuePredicateIndexScale()
+ {
+ return skipValuePredicateIndexScale;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
index 3a9b8818cd..b61a78a7c9 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
@@ -159,6 +159,7 @@ public class StringFirstLastUtils
Long timeValue = copyBuffer.getLong();
int stringSizeBytes = copyBuffer.getInt();
+
if (stringSizeBytes >= 0) {
byte[] valueBytes = new byte[stringSizeBytes];
copyBuffer.get(valueBytes, 0, stringSizeBytes);
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index f2d57a5517..dd0ac9ab11 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -79,6 +79,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -276,6 +277,10 @@ public class IndexIO
throw notEqualValidationException(dim1Name, vals1, vals2);
}
}
+ } else if (vals1 instanceof Object[]) {
+ if (!Arrays.deepEquals((Object[]) vals1, (Object[]) vals2)) {
+ throw notEqualValidationException(dim1Name, vals1, vals2);
+ }
} else {
if (!Objects.equals(vals1, vals2)) {
throw notEqualValidationException(dim1Name, vals1, vals2);
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java
index ae5ca7ff8a..62934d3d34 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java
@@ -26,9 +26,16 @@ import org.apache.druid.segment.index.semantic.NumericRangeIndexes;
public interface ColumnConfig
{
- ColumnConfig DEFAULT = new ColumnConfig()
- {
- };
+ /**
+ * this value was chosen testing bound filters on double columns with a variety of ranges at which this ratio
+ * of number of bitmaps compared to total number of rows appeared to be around the threshold where indexes stopped
+ * performing consistently faster than a full scan + value matcher
+ */
+ double DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE = 0.08;
+
+ double DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE = 0.08;
+
+ ColumnConfig DEFAULT = new ColumnConfig() {};
ColumnConfig ALWAYS_USE_INDEXES = new ColumnConfig()
{
@@ -73,10 +80,7 @@ public interface ColumnConfig
*/
default double skipValueRangeIndexScale()
{
- // this value was chosen testing bound filters on double columns with a variety of ranges at which this ratio
- // of number of bitmaps compared to total number of rows appeared to be around the threshold where indexes stopped
- // performing consistently faster than a full scan + value matcher
- return 0.08;
+ return DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE;
}
/**
@@ -109,6 +113,6 @@ public interface ColumnConfig
*/
default double skipValuePredicateIndexScale()
{
- return 0.08;
+ return DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE;
}
}
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
index 9b87e1cd7b..ceac1e5564 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
@@ -32,10 +32,10 @@ import java.util.concurrent.ScheduledExecutorService;
public class BasicMonitorSchedulerTest
{
- private final MonitorSchedulerConfig config = new MonitorSchedulerConfig()
+ private final DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig()
{
@Override
- public Duration getEmitterPeriod()
+ public Duration getEmissionDuration()
{
return Duration.millis(5);
}
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
index d7ec354ff1..712fcbfbb6 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
@@ -93,7 +93,7 @@ public class ClockDriftSafeMonitorSchedulerTest
ExecutorService executor = Mockito.mock(ExecutorService.class);
final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
- Mockito.mock(MonitorSchedulerConfig.class),
+ Mockito.mock(DruidMonitorSchedulerConfig.class),
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor1, monitor2),
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
@@ -153,8 +153,8 @@ public class ClockDriftSafeMonitorSchedulerTest
Monitor monitor = Mockito.mock(Monitor.class);
- MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
- Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+ DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class);
+ Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L));
final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
@@ -217,8 +217,8 @@ public class ClockDriftSafeMonitorSchedulerTest
Monitor monitor = Mockito.mock(Monitor.class);
- MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
- Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+ DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class);
+ Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L));
final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
@@ -248,8 +248,8 @@ public class ClockDriftSafeMonitorSchedulerTest
Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class)))
.thenThrow(new RuntimeException("Test throwing exception while monitoring"));
- MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
- Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+ DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class);
+ Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L));
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean monitorResultHolder = new AtomicBoolean(false);
@@ -310,8 +310,8 @@ public class ClockDriftSafeMonitorSchedulerTest
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Monitor monitor = Mockito.mock(Monitor.class);
- MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
- Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+ DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class);
+ Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L));
CountDownLatch latch = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Future<?>>()
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
index 40afa8349b..9f9a67270b 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
@@ -34,10 +34,10 @@ public class MonitorSchedulerTest
@Test
public void testMonitorAndStopOnRemove() throws IOException
{
- MonitorSchedulerConfig infiniteFlushDelayConfig = new MonitorSchedulerConfig()
+ DruidMonitorSchedulerConfig infiniteFlushDelayConfig = new DruidMonitorSchedulerConfig()
{
@Override
- public Duration getEmitterPeriod()
+ public Duration getEmissionDuration()
{
return Duration.millis(Long.MAX_VALUE);
}
diff --git a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
index 8d9f713728..e19fbf7554 100644
--- a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
@@ -19,22 +19,18 @@
package org.apache.druid.query;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Guice;
import com.google.inject.Injector;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.config.Config;
+import com.google.inject.ProvisionException;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.utils.JvmUtils;
import org.apache.druid.utils.RuntimeInfo;
-import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import org.skife.config.ConfigurationObjectFactory;
-import java.util.Map;
import java.util.Properties;
/**
@@ -46,47 +42,15 @@ public class DruidProcessingConfigTest
private static final long DIRECT_SIZE = BUFFER_SIZE * (3L + 2L + 1L);
private static final long HEAP_SIZE = BUFFER_SIZE * 2L;
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- private static Injector makeInjector(int numProcessors, long directMemorySize, long heapSize)
- {
- return makeInjector(numProcessors, directMemorySize, heapSize, new Properties(), null);
- }
-
@AfterClass
public static void teardown()
{
JvmUtils.resetTestsToDefaultRuntimeInfo();
}
- private static Injector makeInjector(
- int numProcessors,
- long directMemorySize,
- long heapSize,
- Properties props,
- Map<String, String> replacements
- )
- {
- return Guice.createInjector(
- binder -> {
- binder.bind(RuntimeInfo.class).toInstance(new MockRuntimeInfo(numProcessors, directMemorySize, heapSize));
- binder.requestStaticInjection(JvmUtils.class);
- ConfigurationObjectFactory factory = Config.createFactory(props);
- DruidProcessingConfig config;
- if (replacements != null) {
- config = factory.buildWithReplacements(
- DruidProcessingConfig.class,
- replacements
- );
- } else {
- config = factory.build(DruidProcessingConfig.class);
- }
- binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
- binder.bind(DruidProcessingConfig.class).toInstance(config);
- }
- );
- }
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void testDefaultsMultiProcessor()
@@ -124,7 +88,7 @@ public class DruidProcessingConfigTest
DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class);
Assert.assertEquals(
- DruidProcessingConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES,
+ DruidProcessingBufferConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES,
config.intermediateComputeSizeBytes()
);
}
@@ -144,8 +108,7 @@ public class DruidProcessingConfigTest
NUM_PROCESSORS,
DIRECT_SIZE,
HEAP_SIZE,
- props,
- ImmutableMap.of("base_path", "druid.processing")
+ props
);
DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class);
@@ -164,14 +127,19 @@ public class DruidProcessingConfigTest
Properties props = new Properties();
props.setProperty("druid.processing.buffer.sizeBytes", "-1");
- expectedException.expectCause(CoreMatchers.isA(IAE.class));
-
Injector injector = makeInjector(
NUM_PROCESSORS,
DIRECT_SIZE,
HEAP_SIZE,
- props,
- ImmutableMap.of("base_path", "druid.processing")
+ props
+ );
+ Throwable t = Assert.assertThrows(
+ ProvisionException.class,
+ () -> injector.getInstance(DruidProcessingConfig.class)
+ );
+ Assert.assertTrue(
+ t.getMessage()
+ .contains("Cannot construct instance of `org.apache.druid.java.util.common.HumanReadableBytes`, problem: Invalid format of number: -1. Negative value is not allowed.")
);
}
@@ -184,13 +152,35 @@ public class DruidProcessingConfigTest
NUM_PROCESSORS,
DIRECT_SIZE,
HEAP_SIZE,
- props,
- ImmutableMap.of("base_path", "druid.processing")
+ props
+ );
+ Throwable t = Assert.assertThrows(
+ ProvisionException.class,
+ () -> injector.getInstance(DruidProcessingConfig.class)
);
- DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class);
- expectedException.expectMessage("druid.processing.buffer.sizeBytes must be less than 2GiB");
- config.intermediateComputeSizeBytes();
+ Assert.assertTrue(t.getMessage().contains("druid.processing.buffer.sizeBytes must be less than 2GiB"));
+ }
+
+ private static Injector makeInjector(int numProcessors, long directMemorySize, long heapSize)
+ {
+ return makeInjector(numProcessors, directMemorySize, heapSize, new Properties());
+ }
+ private static Injector makeInjector(
+ int numProcessors,
+ long directMemorySize,
+ long heapSize,
+ Properties props
+ )
+ {
+ Injector injector = new StartupInjectorBuilder().withProperties(props).add(
+ binder -> {
+ binder.bind(RuntimeInfo.class).toInstance(new MockRuntimeInfo(numProcessors, directMemorySize, heapSize));
+ binder.requestStaticInjection(JvmUtils.class);
+ JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
+ }
+ ).build();
+ return injector;
}
public static class MockRuntimeInfo extends RuntimeInfo
diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 5c0e5efb7e..19df276344 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -55,9 +55,9 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.CacheStrategy;
-import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
@@ -75,7 +75,6 @@ import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.DruidServerMetadata;
@@ -124,10 +123,9 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final CachePopulator cachePopulator;
private final CacheConfig cacheConfig;
private final DruidHttpClientConfig httpClientConfig;
- private final DruidProcessingConfig processingConfig;
+ private final BrokerParallelMergeConfig parallelMergeConfig;
private final ForkJoinPool pool;
private final QueryScheduler scheduler;
- private final JoinableFactoryWrapper joinableFactoryWrapper;
private final ServiceEmitter emitter;
@Inject
@@ -139,10 +137,9 @@ public class CachingClusteredClient implements QuerySegmentWalker
CachePopulator cachePopulator,
CacheConfig cacheConfig,
@Client DruidHttpClientConfig httpClientConfig,
- DruidProcessingConfig processingConfig,
+ BrokerParallelMergeConfig parallelMergeConfig,
@Merging ForkJoinPool pool,
QueryScheduler scheduler,
- JoinableFactoryWrapper joinableFactoryWrapper,
ServiceEmitter emitter
)
{
@@ -153,10 +150,9 @@ public class CachingClusteredClient implements QuerySegmentWalker
this.cachePopulator = cachePopulator;
this.cacheConfig = cacheConfig;
this.httpClientConfig = httpClientConfig;
- this.processingConfig = processingConfig;
+ this.parallelMergeConfig = parallelMergeConfig;
this.pool = pool;
this.scheduler = scheduler;
- this.joinableFactoryWrapper = joinableFactoryWrapper;
this.emitter = emitter;
if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
@@ -386,7 +382,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
{
BinaryOperator<T> mergeFn = toolChest.createMergeFn(query);
final QueryContext queryContext = query.context();
- if (processingConfig.useParallelMergePool() && queryContext.getEnableParallelMerges() && mergeFn != null) {
+ if (parallelMergeConfig.useParallelMergePool() && queryContext.getEnableParallelMerges() && mergeFn != null) {
return new ParallelMergeCombiningSequence<>(
pool,
sequencesByInterval,
@@ -395,10 +391,10 @@ public class CachingClusteredClient implements QuerySegmentWalker
queryContext.hasTimeout(),
queryContext.getTimeout(),
queryContext.getPriority(),
- queryContext.getParallelMergeParallelism(processingConfig.getMergePoolDefaultMaxQueryParallelism()),
- queryContext.getParallelMergeInitialYieldRows(processingConfig.getMergePoolTaskInitialYieldRows()),
- queryContext.getParallelMergeSmallBatchRows(processingConfig.getMergePoolSmallBatchRows()),
- processingConfig.getMergePoolTargetTaskRunTimeMillis(),
+ queryContext.getParallelMergeParallelism(parallelMergeConfig.getDefaultMaxQueryParallelism()),
+ queryContext.getParallelMergeInitialYieldRows(parallelMergeConfig.getInitialYieldNumRows()),
+ queryContext.getParallelMergeSmallBatchRows(parallelMergeConfig.getSmallBatchNumRows()),
+ parallelMergeConfig.getTargetRunTimeMillis(),
reportMetrics -> {
QueryMetrics<?> queryMetrics = queryPlus.getQueryMetrics();
if (queryMetrics != null) {
diff --git a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index d5f18dd178..77572bae47 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -39,13 +39,14 @@ import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.offheap.OffheapBufferGenerator;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.utils.JvmUtils;
@@ -67,7 +68,9 @@ public class BrokerProcessingModule implements Module
@Override
public void configure(Binder binder)
{
- binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
+ JsonConfigProvider.bind(binder, "druid.processing.merge", BrokerParallelMergeConfig.class);
+ JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
+ binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
MetricsModule.register(binder, ExecutorServiceMonitor.class);
}
@@ -133,14 +136,14 @@ public class BrokerProcessingModule implements Module
@Provides
@ManageLifecycle
- public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config)
+ public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(BrokerParallelMergeConfig config)
{
return new LifecycleForkJoinPoolProvider(
- config.getMergePoolParallelism(),
+ config.getParallelism(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(t, e) -> log.error(e, "Unhandled exception in thread [%s]", t),
true,
- config.getMergePoolAwaitShutdownMillis()
+ config.getAwaitShutdownMillis()
);
}
diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index b02ae366f5..56a0fd0ede 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -38,7 +38,6 @@ import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.offheap.OffheapBufferGenerator;
@@ -47,13 +46,13 @@ import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.MetricsEmittingQueryProcessingPool;
import org.apache.druid.query.PrioritizedExecutorService;
import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.utils.JvmUtils;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ForkJoinPool;
/**
*/
@@ -64,7 +63,8 @@ public class DruidProcessingModule implements Module
@Override
public void configure(Binder binder)
{
- binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
+ JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
+ binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
MetricsModule.register(binder, ExecutorServiceMonitor.class);
}
@@ -135,26 +135,6 @@ public class DruidProcessingModule implements Module
);
}
- @Provides
- @ManageLifecycle
- public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config)
- {
- return new LifecycleForkJoinPoolProvider(
- config.getMergePoolParallelism(),
- ForkJoinPool.defaultForkJoinWorkerThreadFactory,
- (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t),
- true,
- config.getMergePoolAwaitShutdownMillis()
- );
- }
-
- @Provides
- @Merging
- public ForkJoinPool getMergeProcessingPool(LifecycleForkJoinPoolProvider poolProvider)
- {
- return poolProvider.getPool();
- }
-
private void verifyDirectMemory(DruidProcessingConfig config)
{
try {
diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java b/server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java
similarity index 58%
rename from server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java
rename to server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java
index 7e83817de2..3d44cf23f7 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java
+++ b/server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java
@@ -19,17 +19,24 @@
package org.apache.druid.guice;
-import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;
-import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.LegacyBrokerParallelMergeConfig;
-public class DruidProcessingConfigModule implements Module
+/**
+ * Backwards compatibility for runtime.properties for Druid 27 and older to make deprecated config paths of
+ * {@link LegacyBrokerParallelMergeConfig} still work for Druid 28.
+ * {@link org.apache.druid.query.BrokerParallelMergeConfig} has replaced these configs, and will warn when these
+ * deprecated paths are configured. This module should be removed in Druid 29, along with
+ * {@link LegacyBrokerParallelMergeConfig} as well as the config-magic library that makes it work.
+ */
+@Deprecated
+public class LegacyBrokerParallelMergeConfigModule implements Module
{
@Override
public void configure(Binder binder)
{
- ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing"));
+ ConfigProvider.bind(binder, LegacyBrokerParallelMergeConfig.class);
}
}
diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
index 59af28a4f5..3b68289c6d 100644
--- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
@@ -29,12 +29,12 @@ import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.server.metrics.MetricsModule;
import java.nio.ByteBuffer;
@@ -53,7 +53,8 @@ public class RouterProcessingModule implements Module
@Override
public void configure(Binder binder)
{
- binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
+ JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
+ binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
MetricsModule.register(binder, ExecutorServiceMonitor.class);
}
@@ -61,8 +62,8 @@ public class RouterProcessingModule implements Module
@ManageLifecycle
public QueryProcessingPool getProcessingExecutorPool(DruidProcessingConfig config)
{
- if (config.getNumThreadsConfigured() != ExecutorServiceConfig.DEFAULT_NUM_THREADS) {
- log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreadsConfigured());
+ if (config.isNumThreadsConfigured()) {
+ log.warn("numThreads[%d] configured, that is ignored on Router", config.getNumThreads());
}
return new ForwardingQueryProcessingPool(Execs.dummy());
}
@@ -80,10 +81,10 @@ public class RouterProcessingModule implements Module
@Merging
public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig config)
{
- if (config.getNumMergeBuffersConfigured() != DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS) {
- log.error(
+ if (config.isNumMergeBuffersConfigured()) {
+ log.warn(
"numMergeBuffers[%d] configured, that is ignored on Router",
- config.getNumMergeBuffersConfigured()
+ config.getNumMergeBuffers()
);
}
return DummyBlockingPool.instance();
diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
index 842abd0e53..cc4e0b1e33 100644
--- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
+++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
@@ -56,7 +56,6 @@ public class StorageNodeModule implements Module
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
bindLocationSelectorStrategy(binder);
-
binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null));
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
}
diff --git a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
index de4f3abbc8..d27fdf75b1 100644
--- a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
+++ b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
@@ -26,7 +26,6 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.CoordinatorDiscoveryModule;
import org.apache.druid.guice.DruidInjectorBuilder;
-import org.apache.druid.guice.DruidProcessingConfigModule;
import org.apache.druid.guice.DruidSecondaryModule;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.ExtensionsModule;
@@ -112,7 +111,6 @@ public class CoreInjectorBuilder extends DruidInjectorBuilder
new MetricsModule(),
new SegmentWriteOutMediumModule(),
new ServerModule(),
- new DruidProcessingConfigModule(),
new StorageNodeModule(),
new JettyServerModule(),
new ExpressionModule(),
diff --git a/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java b/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java
new file mode 100644
index 0000000000..1f8d25a10c
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+
+public class BrokerParallelMergeConfig
+{
+ private static final Logger LOG = new Logger(BrokerParallelMergeConfig.class);
+ public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000;
+
+ @JsonProperty
+ private final boolean useParallelMergePool;
+ @JsonProperty
+ private final int parallelism;
+ @JsonProperty
+ private final long awaitShutdownMillis;
+ @JsonProperty
+ private final int defaultMaxQueryParallelism;
+ @JsonProperty
+ private final int targetRunTimeMillis;
+ @JsonProperty
+ private final int initialYieldNumRows;
+ @JsonProperty
+ private final int smallBatchNumRows;
+
+ @JsonCreator
+ public BrokerParallelMergeConfig(
+ @JsonProperty("useParallelMergePool") @Nullable Boolean useParallelMergePool,
+ @JsonProperty("parallelism") @Nullable Integer parallelism,
+ @JsonProperty("awaitShutdownMillis") @Nullable Long awaitShutdownMillis,
+ @JsonProperty("defaultMaxQueryParallelism") @Nullable Integer defaultMaxQueryParallelism,
+ @JsonProperty("targetRunTimeMillis") @Nullable Integer targetRunTimeMillis,
+ @JsonProperty("initialYieldNumRows") @Nullable Integer initialYieldNumRows,
+ @JsonProperty("smallBatchNumRows") @Nullable Integer smallBatchNumRows,
+ @JacksonInject LegacyBrokerParallelMergeConfig oldConfig
+ )
+ {
+ if (parallelism == null) {
+ if (oldConfig == null || oldConfig.getMergePoolParallelism() == null) {
+ // assume 2 hyper-threads per core, so that this value is probably by default the number
+ // of physical cores * 1.5
+ this.parallelism = (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75);
+ } else {
+ warnDeprecated(
+ "druid.processing.merge.pool.parallelism",
+ "druid.processing.merge.parallelism"
+ );
+ this.parallelism = oldConfig.getMergePoolParallelism();
+ }
+ } else {
+ this.parallelism = parallelism;
+ }
+
+ // need at least 3 to do 2 layer merge
+ if (this.parallelism > 2) {
+ this.useParallelMergePool = useParallelMergePool == null || useParallelMergePool;
+ } else {
+ if (useParallelMergePool == null || useParallelMergePool) {
+ LOG.debug(
+ "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s",
+ parallelism
+ );
+ }
+ this.useParallelMergePool = false;
+ }
+
+ if (awaitShutdownMillis == null) {
+ if (oldConfig == null || oldConfig.getMergePoolAwaitShutdownMillis() == null) {
+ this.awaitShutdownMillis = DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS;
+ } else {
+ warnDeprecated(
+ "druid.processing.merge.pool.awaitShutdownMillis",
+ "druid.processing.merge.awaitShutdownMillis"
+ );
+ this.awaitShutdownMillis = oldConfig.getMergePoolAwaitShutdownMillis();
+ }
+ } else {
+ this.awaitShutdownMillis = awaitShutdownMillis;
+ }
+
+ if (defaultMaxQueryParallelism == null) {
+ if (oldConfig == null || oldConfig.getMergePoolDefaultMaxQueryParallelism() == null) {
+ this.defaultMaxQueryParallelism = (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1);
+ } else {
+ warnDeprecated(
+ "druid.processing.merge.pool.defaultMaxQueryParallelism",
+ "druid.processing.merge.defaultMaxQueryParallelism"
+ );
+ this.defaultMaxQueryParallelism = oldConfig.getMergePoolDefaultMaxQueryParallelism();
+ }
+ } else {
+ this.defaultMaxQueryParallelism = defaultMaxQueryParallelism;
+ }
+
+ if (targetRunTimeMillis == null) {
+ if (oldConfig == null || oldConfig.getMergePoolTargetTaskRunTimeMillis() == null) {
+ this.targetRunTimeMillis = ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS;
+ } else {
+ warnDeprecated(
+ "druid.processing.merge.task.targetRunTimeMillis",
+ "druid.processing.merge.targetRunTimeMillis"
+ );
+ this.targetRunTimeMillis = oldConfig.getMergePoolTargetTaskRunTimeMillis();
+ }
+ } else {
+ this.targetRunTimeMillis = targetRunTimeMillis;
+ }
+
+ if (initialYieldNumRows == null) {
+ if (oldConfig == null || oldConfig.getMergePoolTaskInitialYieldRows() == null) {
+ this.initialYieldNumRows = ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS;
+ } else {
+ warnDeprecated(
+ "druid.processing.merge.task.initialYieldNumRows",
+ "druid.processing.merge.initialYieldNumRows"
+ );
+ this.initialYieldNumRows = oldConfig.getMergePoolTaskInitialYieldRows();
+ }
+ } else {
+ this.initialYieldNumRows = initialYieldNumRows;
+ }
+
+ if (smallBatchNumRows == null) {
+ if (oldConfig == null || oldConfig.getMergePoolSmallBatchRows() == null) {
+ this.smallBatchNumRows = ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS;
+ } else {
+ warnDeprecated(
+ "druid.processing.merge.task.smallBatchNumRows",
+ "druid.processing.merge.smallBatchNumRows"
+ );
+ this.smallBatchNumRows = oldConfig.getMergePoolSmallBatchRows();
+ }
+ } else {
+ this.smallBatchNumRows = smallBatchNumRows;
+ }
+ }
+
+ @VisibleForTesting
+ public BrokerParallelMergeConfig()
+ {
+ this(null, null, null, null, null, null, null, null);
+ }
+
+ public boolean useParallelMergePool()
+ {
+ return useParallelMergePool;
+ }
+
+ public int getParallelism()
+ {
+ return parallelism;
+ }
+
+ public long getAwaitShutdownMillis()
+ {
+ return awaitShutdownMillis;
+ }
+
+ public int getDefaultMaxQueryParallelism()
+ {
+ return defaultMaxQueryParallelism;
+ }
+
+ public int getTargetRunTimeMillis()
+ {
+ return targetRunTimeMillis;
+ }
+
+ public int getInitialYieldNumRows()
+ {
+ return initialYieldNumRows;
+ }
+
+ public int getSmallBatchNumRows()
+ {
+ return smallBatchNumRows;
+ }
+
+ private static void warnDeprecated(String oldPath, String newPath)
+ {
+ LOG.warn(
+ "Using deprecated config [%s] which has been replace by [%s]. This path is deprecated and will be "
+ + "removed in a future release, please transition to using [%s]",
+ oldPath,
+ newPath,
+ newPath
+ );
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java b/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java
new file mode 100644
index 0000000000..25b11ab63d
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.skife.config.Config;
+
+import javax.annotation.Nullable;
+
+/**
+ * Backwards compatibility for Druid 27 and older runtime.properties configs, replaced by
+ * {@link BrokerParallelMergeConfig} in Druid 28. This config should be removed in Druid 29.
+ */
+@Deprecated
+public abstract class LegacyBrokerParallelMergeConfig
+{
+ @Nullable
+ @Config(value = "druid.processing.merge.pool.parallelism")
+ public Integer getMergePoolParallelism()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Config(value = "druid.processing.merge.pool.awaitShutdownMillis")
+ public Long getMergePoolAwaitShutdownMillis()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Config(value = "druid.processing.merge.pool.defaultMaxQueryParallelism")
+ public Integer getMergePoolDefaultMaxQueryParallelism()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Config(value = "druid.processing.merge.task.targetRunTimeMillis")
+ public Integer getMergePoolTargetTaskRunTimeMillis()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Config(value = "druid.processing.merge.task.initialYieldNumRows")
+ public Integer getMergePoolTaskInitialYieldRows()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Config(value = "druid.processing.merge.task.smallBatchNumRows")
+ public Integer getMergePoolSmallBatchRows()
+ {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java
deleted file mode 100644
index 1f6bc47dda..0000000000
--- a/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.realtime;
-
-import org.skife.config.Config;
-
-public abstract class DbSegmentPublisherConfig
-{
- @Config("druid.metadata.storage.tables.segments")
- public abstract String getSegmentTable();
-}
diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index 46c0fc90d8..d486ae4fb8 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.JvmCpuMonitor;
import org.apache.druid.java.util.metrics.JvmMonitor;
import org.apache.druid.java.util.metrics.JvmThreadsMonitor;
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index f77baa4812..89cb2d76f8 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -38,7 +38,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
@@ -47,7 +47,6 @@ import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -318,16 +317,23 @@ public class CachingClusteredClientFunctionalityTest
return 0L;
}
},
- new DruidProcessingConfig()
+ new BrokerParallelMergeConfig()
{
@Override
- public String getFormatString()
+ public boolean useParallelMergePool()
{
- return null;
+ return true;
+ }
+
+ @Override
+ public int getParallelism()
+ {
+ // fixed so same behavior across all test environments
+ return 4;
}
@Override
- public int getMergePoolParallelism()
+ public int getDefaultMaxQueryParallelism()
{
// fixed so same behavior across all test environments
return 4;
@@ -335,7 +341,6 @@ public class CachingClusteredClientFunctionalityTest
},
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
- JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
index 95fce2060e..6270f36854 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
@@ -34,8 +34,8 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.TestSequence;
import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.DataSource;
-import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@@ -49,7 +49,6 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerManagerTest;
import org.apache.druid.server.coordination.ServerType;
@@ -136,10 +135,9 @@ public class CachingClusteredClientPerfTest
Mockito.mock(CachePopulator.class),
new CacheConfig(),
Mockito.mock(DruidHttpClientConfig.class),
- Mockito.mock(DruidProcessingConfig.class),
+ Mockito.mock(BrokerParallelMergeConfig.class),
ForkJoinPool.commonPool(),
queryScheduler,
- JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index e083e074ac..0ebd441360 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -67,8 +67,8 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.nary.TrinaryFn;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.BySegmentResultValueClass;
-import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.FluentQueryRunner;
@@ -121,7 +121,6 @@ import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.coordination.ServerType;
@@ -2829,19 +2828,26 @@ public class CachingClusteredClientTest
return 0L;
}
},
- new DruidProcessingConfig()
+ new BrokerParallelMergeConfig()
{
@Override
- public String getFormatString()
+ public boolean useParallelMergePool()
{
- return null;
+ return true;
+ }
+
+ @Override
+ public int getParallelism()
+ {
+ // fixed so same behavior across all test environments
+ return 1;
}
@Override
- public int getMergePoolParallelism()
+ public int getDefaultMaxQueryParallelism()
{
// fixed so same behavior across all test environments
- return 4;
+ return 1;
}
},
ForkJoinPool.commonPool(),
@@ -2851,7 +2857,6 @@ public class CachingClusteredClientTest
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
),
- JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}
diff --git a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
index 3ce5db12a3..7fca81aaea 100644
--- a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
+++ b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
@@ -28,7 +28,10 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.common.config.Config;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.LegacyBrokerParallelMergeConfig;
import org.apache.druid.utils.JvmUtils;
import org.junit.Assert;
import org.junit.Assume;
@@ -37,14 +40,14 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import org.skife.config.ConfigurationObjectFactory;
+
+import java.util.Properties;
@RunWith(MockitoJUnitRunner.class)
public class BrokerProcessingModuleTest
{
- private static final boolean INJECT_SERVER_TYPE_CONFIG = true;
- @Mock
- private DruidProcessingConfig druidProcessingConfig;
private Injector injector;
private BrokerProcessingModule target;
@Mock
@@ -56,12 +59,13 @@ public class BrokerProcessingModuleTest
public void setUp()
{
target = new BrokerProcessingModule();
- injector = makeInjector(INJECT_SERVER_TYPE_CONFIG);
+ injector = makeInjector(new Properties());
}
@Test
public void testIntermediateResultsPool()
{
+ DruidProcessingConfig druidProcessingConfig = injector.getInstance(DruidProcessingConfig.class);
target.getIntermediateResultsPool(druidProcessingConfig);
}
@@ -69,23 +73,30 @@ public class BrokerProcessingModuleTest
@Test
public void testMergeBufferPool()
{
+ DruidProcessingConfig druidProcessingConfig = injector.getInstance(DruidProcessingConfig.class);
target.getMergeBufferPool(druidProcessingConfig);
}
@Test
public void testMergeProcessingPool()
{
- DruidProcessingConfig config = new DruidProcessingConfig()
- {
- @Override
- public String getFormatString()
- {
- return "processing-test-%s";
- }
- };
- DruidProcessingModule module = new DruidProcessingModule();
+ BrokerParallelMergeConfig config = injector.getInstance(BrokerParallelMergeConfig.class);
+ BrokerProcessingModule module = new BrokerProcessingModule();
module.getMergeProcessingPoolProvider(config);
- config.getNumInitalBuffersForIntermediatePool();
+ }
+
+ @Test
+ public void testMergeProcessingPoolLegacyConfigs()
+ {
+ Properties props = new Properties();
+ props.put("druid.processing.merge.pool.parallelism", "10");
+ props.put("druid.processing.merge.pool.defaultMaxQueryParallelism", "10");
+ props.put("druid.processing.merge.task.targetRunTimeMillis", "1000");
+ Injector gadget = makeInjector(props);
+ BrokerParallelMergeConfig config = gadget.getInstance(BrokerParallelMergeConfig.class);
+ Assert.assertEquals(10, config.getParallelism());
+ Assert.assertEquals(10, config.getDefaultMaxQueryParallelism());
+ Assert.assertEquals(1000, config.getTargetRunTimeMillis());
}
@Test
@@ -93,7 +104,6 @@ public class BrokerProcessingModuleTest
{
CachePopulator cachePopulator = injector.getInstance(CachePopulator.class);
Assert.assertNotNull(cachePopulator);
-
}
@Test(expected = ProvisionException.class)
@@ -107,43 +117,40 @@ public class BrokerProcessingModuleTest
catch (UnsupportedOperationException e) {
Assume.assumeNoException(e);
}
+ Properties props = new Properties();
+ props.setProperty("druid.processing.buffer.sizeBytes", "3GiB");
+ Injector injector1 = makeInjector(props);
+ DruidProcessingConfig processingBufferConfig = injector1.getInstance(DruidProcessingConfig.class);
BrokerProcessingModule module = new BrokerProcessingModule();
- module.getMergeBufferPool(new DruidProcessingConfig()
- {
- @Override
- public String getFormatString()
- {
- return "test";
- }
-
- @Override
- public int intermediateComputeSizeBytes()
- {
- return Integer.MAX_VALUE;
- }
- });
+ module.getMergeBufferPool(processingBufferConfig);
}
- private Injector makeInjector(boolean withServerTypeConfig)
+ private Injector makeInjector(Properties props)
{
- return Initialization.makeInjectorWithModules(
- GuiceInjectors.makeStartupInjector(), (ImmutableList.of(Modules.override(
- (binder) -> {
- binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
- binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
- binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
- binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig);
- },
- target
- ).with(
- (binder) -> {
+
+ Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(),
+ ImmutableList.of(
+ Modules.override(
+ (binder) -> {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+ binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
+ binder.bind(Properties.class).toInstance(props);
+ ConfigurationObjectFactory factory = Config.createFactory(props);
+ LegacyBrokerParallelMergeConfig legacyConfig = factory.build(LegacyBrokerParallelMergeConfig.class);
+ binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
+ binder.bind(LegacyBrokerParallelMergeConfig.class).toInstance(legacyConfig);
+ },
+ target
+ ).with((binder) -> {
binder.bind(CachePopulatorStats.class).toInstance(cachePopulatorStats);
binder.bind(CacheConfig.class).toInstance(cacheConfig);
- }
+ })
)
- )));
+ );
+ return injector;
}
-
}
diff --git a/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java b/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java
index b51f2c1485..8fad8924bf 100644
--- a/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java
+++ b/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java
@@ -77,7 +77,7 @@ public class QueryableModuleTest
new JacksonModule(),
new ConfigModule(),
new QueryRunnerFactoryModule(),
- new DruidProcessingConfigModule(),
+ new LegacyBrokerParallelMergeConfigModule(),
new BrokerProcessingModule(),
new LifecycleModule(),
binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class),
diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
index d07bfdef34..308427d40f 100644
--- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
+++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
@@ -49,7 +49,6 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
@@ -108,7 +107,6 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
{
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(
CLOSER,
- USE_PARALLEL_MERGE_POOL_CONFIGURED,
() -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD
);
@@ -142,13 +140,9 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig(),
- QueryStackTests.getProcessingConfig(
- USE_PARALLEL_MERGE_POOL_CONFIGURED,
- DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
- ),
+ QueryStackTests.getParallelMergeConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
- JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
servers = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 1c23edf3b9..69305adc12 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -100,6 +101,9 @@ public class QueryStackTests
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
);
+
+ public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
+
private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
@@ -187,10 +191,19 @@ public class QueryStackTests
);
}
- public static DruidProcessingConfig getProcessingConfig(
- boolean useParallelMergePoolConfigured,
- final int mergeBuffers
+ public static BrokerParallelMergeConfig getParallelMergeConfig(
+ boolean useParallelMergePoolConfigured
)
+ {
+ return new BrokerParallelMergeConfig() {
+ @Override
+ public boolean useParallelMergePool()
+ {
+ return useParallelMergePoolConfigured;
+ }
+ };
+ }
+ public static DruidProcessingConfig getProcessingConfig(final int mergeBuffers)
{
return new DruidProcessingConfig()
{
@@ -221,12 +234,6 @@ public class QueryStackTests
}
return mergeBuffers;
}
-
- @Override
- public boolean useParallelMergePoolConfigured()
- {
- return useParallelMergePoolConfigured;
- }
};
}
@@ -235,28 +242,18 @@ public class QueryStackTests
*/
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(final Closer closer)
{
- return createQueryRunnerFactoryConglomerate(closer, true, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD);
- }
-
- public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
- final Closer closer,
- final Supplier<Integer> minTopNThresholdSupplier
- )
- {
- return createQueryRunnerFactoryConglomerate(closer, true, minTopNThresholdSupplier);
+ return createQueryRunnerFactoryConglomerate(closer, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD);
}
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
final Closer closer,
- final boolean useParallelMergePoolConfigured,
final Supplier<Integer> minTopNThresholdSupplier
)
{
return createQueryRunnerFactoryConglomerate(
closer,
getProcessingConfig(
- useParallelMergePoolConfigured,
- DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
+ DEFAULT_NUM_MERGE_BUFFERS
),
minTopNThresholdSupplier
);
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 6bbcdf7f3a..dd9c3c3c0b 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -45,6 +45,7 @@ import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.QueryRunnerFactoryModule;
@@ -108,6 +109,7 @@ public class CliBroker extends ServerRunnable
protected List<? extends Module> getModules()
{
return ImmutableList.of(
+ new LegacyBrokerParallelMergeConfigModule(),
new BrokerProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/DumpSegment.java b/services/src/main/java/org/apache/druid/cli/DumpSegment.java
index 566f89d5fd..afbd58ab95 100644
--- a/services/src/main/java/org/apache/druid/cli/DumpSegment.java
+++ b/services/src/main/java/org/apache/druid/cli/DumpSegment.java
@@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
@@ -55,7 +54,6 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DirectQueryProcessingPool;
-import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@@ -81,7 +79,6 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.BaseColumn;
-import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
@@ -733,39 +730,10 @@ public class DumpSegment extends GuiceRunnable
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
- new Module()
- {
- @Override
- public void configure(Binder binder)
- {
- binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
- binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
- binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
- binder.bind(DruidProcessingConfig.class).toInstance(
- new DruidProcessingConfig()
- {
- @Override
- public String getFormatString()
- {
- return "processing-%s";
- }
-
- @Override
- public int intermediateComputeSizeBytes()
- {
- return 100 * 1024 * 1024;
- }
-
- @Override
- public int getNumThreads()
- {
- return 1;
- }
-
- }
- );
- binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
- }
+ binder -> {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
+ binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
}
);
}
diff --git a/services/src/main/java/org/apache/druid/cli/ValidateSegments.java b/services/src/main/java/org/apache/druid/cli/ValidateSegments.java
index 925d7c81a8..f9e2ce627b 100644
--- a/services/src/main/java/org/apache/druid/cli/ValidateSegments.java
+++ b/services/src/main/java/org/apache/druid/cli/ValidateSegments.java
@@ -23,7 +23,6 @@ import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
@@ -32,9 +31,7 @@ import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.column.ColumnConfig;
import java.io.File;
import java.util.List;
@@ -85,39 +82,10 @@ public class ValidateSegments extends GuiceRunnable
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
- new Module()
- {
- @Override
- public void configure(Binder binder)
- {
- binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
- binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
- binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
- binder.bind(DruidProcessingConfig.class).toInstance(
- new DruidProcessingConfig()
- {
- @Override
- public String getFormatString()
- {
- return "processing-%s";
- }
-
- @Override
- public int intermediateComputeSizeBytes()
- {
- return 100 * 1024 * 1024;
- }
-
- @Override
- public int getNumThreads()
- {
- return 1;
- }
-
- }
- );
- binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
- }
+ binder -> {
+ binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
+ binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
+ binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
}
);
}
diff --git a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java
index 10027ae73b..fb440311a6 100644
--- a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java
+++ b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java
@@ -25,12 +25,15 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import com.google.inject.Key;
+import com.google.inject.name.Names;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.NestedDataModule;
+import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.initialization.ServerInjectorBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -48,6 +51,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex;
@@ -215,6 +219,22 @@ public class DumpSegmentTest extends InitializedNullHandlingTest
}
}
+ @Test
+ public void testGetModules()
+ {
+ DumpSegment dumpSegment = new DumpSegment();
+ Injector injector = ServerInjectorBuilder.makeServerInjector(
+ new StartupInjectorBuilder().forServer().build(),
+ Collections.emptySet(),
+ dumpSegment.getModules()
+ );
+ Assert.assertNotNull(injector.getInstance(ColumnConfig.class));
+ Assert.assertEquals("druid/tool", injector.getInstance(Key.get(String.class, Names.named("serviceName"))));
+ Assert.assertEquals(9999, (int) injector.getInstance(Key.get(Integer.class, Names.named("servicePort"))));
+ Assert.assertEquals(-1, (int) injector.getInstance(Key.get(Integer.class, Names.named("tlsServicePort"))));
+ }
+
+
public static List<Segment> createSegments(
AggregationTestHelper helper,
TemporaryFolder tempFolder,
diff --git a/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java b/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java
new file mode 100644
index 0000000000..03ce574196
--- /dev/null
+++ b/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.cli;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
+import org.apache.druid.data.input.ResourceInputSource;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.initialization.ServerInjectorBuilder;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.NestedDataTestUtils;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class ValidateSegmentsTest extends InitializedNullHandlingTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testValidateSegments() throws IOException
+ {
+
+ JsonInputFormat inputFormat = new JsonInputFormat(
+ JSONPathSpec.DEFAULT,
+ null,
+ null,
+ null,
+ null
+ );
+ IndexBuilder bob = IndexBuilder.create()
+ .tmpDir(temporaryFolder.newFolder())
+ .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+ .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec())
+ .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec())
+ .withMetrics(
+ new CountAggregatorFactory("cnt")
+ )
+ .withRollup(false)
+ .build()
+ )
+ .inputSource(
+ ResourceInputSource.of(
+ NestedDataTestUtils.class.getClassLoader(),
+ NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE
+ )
+ )
+ .inputFormat(inputFormat)
+ .inputTmpDir(temporaryFolder.newFolder());
+
+ final File segment1 = bob.buildMMappedIndexFile();
+ final File segment2 = bob.buildMMappedIndexFile();
+ final Injector injector = Mockito.mock(Injector.class);
+ Mockito.when(injector.getInstance(IndexIO.class)).thenReturn(bob.getIndexIO());
+ ValidateSegments validator = new ValidateSegments() {
+ @Override
+ public Injector makeInjector()
+ {
+ return injector;
+ }
+ };
+ validator.directories = Arrays.asList(segment1.getAbsolutePath(), segment2.getAbsolutePath());
+ // if this doesn't pass, it throws a runtime exception, which would fail the test
+ validator.run();
+ }
+
+ @Test
+ public void testGetModules()
+ {
+ ValidateSegments validator = new ValidateSegments();
+ Injector injector = ServerInjectorBuilder.makeServerInjector(
+ new StartupInjectorBuilder().forServer().build(),
+ Collections.emptySet(),
+ validator.getModules()
+ );
+ Assert.assertNotNull(injector.getInstance(ColumnConfig.class));
+ Assert.assertEquals("druid/tool", injector.getInstance(Key.get(String.class, Names.named("serviceName"))));
+ Assert.assertEquals(9999, (int) injector.getInstance(Key.get(Integer.class, Names.named("servicePort"))));
+ Assert.assertEquals(-1, (int) injector.getInstance(Key.get(Integer.class, Names.named("tlsServicePort"))));
+ }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
index 0881a969fc..3467d71bb6 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
@@ -219,7 +219,7 @@ public class SqlTestFramework
} else {
return QueryStackTests.createQueryRunnerFactoryConglomerate(
resourceCloser,
- QueryStackTests.getProcessingConfig(true, builder.mergeBufferCount)
+ QueryStackTests.getProcessingConfig(builder.mergeBufferCount)
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org