You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/08/17 01:24:03 UTC

[druid] branch master updated: deprecate config-magic in favor of json configuration stuff (#14695)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b14dde50e deprecate config-magic in favor of json configuration stuff (#14695)
6b14dde50e is described below

commit 6b14dde50ef82d3ba8faddc5bc9517e4ae3c461c
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Wed Aug 16 18:23:57 2023 -0700

    deprecate config-magic in favor of json configuration stuff (#14695)
    
    * json config based processing and broker merge configs to deprecate config-magic
---
 .../query/CachingClusteredClientBenchmark.java     |  17 +-
 .../benchmark/query/SqlExpressionBenchmark.java    |   7 -
 .../benchmark/query/SqlNestedDataBenchmark.java    |   8 +-
 .../movingaverage/MovingAverageQueryTest.java      |  13 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   2 +-
 .../kafka/supervisor/KafkaSupervisorSpec.java      |   2 +-
 .../kafka/supervisor/KafkaSupervisorSpecTest.java  |   2 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |   2 +-
 .../kinesis/supervisor/KinesisSupervisorSpec.java  |   2 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   2 +-
 .../supervisor/SeekableStreamSupervisor.java       |   4 +-
 .../supervisor/SeekableStreamSupervisorSpec.java   |   2 +-
 .../SeekableStreamSupervisorSpecTest.java          |   2 +-
 .../SeekableStreamSupervisorStateTest.java         |  34 ++-
 .../apache/druid/testsEx/config/Initializer.java   |   4 +-
 .../org/apache/druid/guice/ConfigProvider.java     |   6 +-
 .../common/concurrent/ExecutorServiceConfig.java   |  57 -----
 .../util/metrics/AllocationMetricCollector.java    |   2 +-
 .../java/util/metrics/BasicMonitorScheduler.java   |   4 +-
 .../metrics/ClockDriftSafeMonitorScheduler.java    |   4 +-
 .../util}/metrics/DruidMonitorSchedulerConfig.java |  15 +-
 .../druid/java/util/metrics/MonitorScheduler.java  |   6 +-
 .../java/util/metrics/MonitorSchedulerConfig.java  |  33 ---
 .../druid/query/DruidProcessingBufferConfig.java   |  78 +++++++
 .../apache/druid/query/DruidProcessingConfig.java  | 232 +++++++++------------
 .../druid/query/DruidProcessingIndexesConfig.java  |  67 ++++++
 .../aggregation/first/StringFirstLastUtils.java    |   1 +
 .../java/org/apache/druid/segment/IndexIO.java     |   5 +
 .../apache/druid/segment/column/ColumnConfig.java  |  20 +-
 .../util/metrics/BasicMonitorSchedulerTest.java    |   4 +-
 .../ClockDriftSafeMonitorSchedulerTest.java        |  18 +-
 .../java/util/metrics/MonitorSchedulerTest.java    |   4 +-
 .../druid/query/DruidProcessingConfigTest.java     |  98 ++++-----
 .../druid/client/CachingClusteredClient.java       |  22 +-
 .../apache/druid/guice/BrokerProcessingModule.java |  13 +-
 .../apache/druid/guice/DruidProcessingModule.java  |  26 +--
 ... => LegacyBrokerParallelMergeConfigModule.java} |  15 +-
 .../apache/druid/guice/RouterProcessingModule.java |  15 +-
 .../org/apache/druid/guice/StorageNodeModule.java  |   1 -
 .../druid/initialization/CoreInjectorBuilder.java  |   2 -
 .../druid/query/BrokerParallelMergeConfig.java     | 215 +++++++++++++++++++
 .../query/LegacyBrokerParallelMergeConfig.java     |  74 +++++++
 .../segment/realtime/DbSegmentPublisherConfig.java |  28 ---
 .../apache/druid/server/metrics/MetricsModule.java |   1 +
 .../CachingClusteredClientFunctionalityTest.java   |  19 +-
 .../client/CachingClusteredClientPerfTest.java     |   6 +-
 .../druid/client/CachingClusteredClientTest.java   |  21 +-
 .../druid/guice/BrokerProcessingModuleTest.java    |  95 +++++----
 .../apache/druid/guice/QueryableModuleTest.java    |   2 +-
 .../QueryRunnerBasedOnClusteredClientTestBase.java |   8 +-
 .../org/apache/druid/server/QueryStackTests.java   |  39 ++--
 .../main/java/org/apache/druid/cli/CliBroker.java  |   2 +
 .../java/org/apache/druid/cli/DumpSegment.java     |  40 +---
 .../org/apache/druid/cli/ValidateSegments.java     |  40 +---
 .../java/org/apache/druid/cli/DumpSegmentTest.java |  20 ++
 .../org/apache/druid/cli/ValidateSegmentsTest.java | 117 +++++++++++
 .../druid/sql/calcite/util/SqlTestFramework.java   |   2 +-
 57 files changed, 952 insertions(+), 628 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index 6c9c6aa3d4..e38ee8862d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.BySegmentQueryRunner;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
 import org.apache.druid.query.DruidProcessingConfig;
@@ -103,7 +104,6 @@ import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -258,12 +258,6 @@ public class CachingClusteredClientBenchmark
       {
         return numProcessingThreads;
       }
-
-      @Override
-      public boolean useParallelMergePool()
-      {
-        return true;
-      }
     };
 
     conglomerate = new DefaultQueryRunnerFactoryConglomerate(
@@ -339,10 +333,15 @@ public class CachingClusteredClientBenchmark
         new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0),
         new CacheConfig(),
         new DruidHttpClientConfig(),
-        processingConfig,
+        new BrokerParallelMergeConfig() {
+          @Override
+          public boolean useParallelMergePool()
+          {
+            return true;
+          }
+        },
         forkJoinPool,
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
         new NoopServiceEmitter()
     );
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
index 7733281908..80d208c5a4 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
@@ -106,13 +106,6 @@ public class SqlExpressionBenchmark
     {
       return 1;
     }
-
-    @Override
-    public boolean useParallelMergePoolConfigured()
-    {
-      return true;
-    }
-
     @Override
     public String getFormatString()
     {
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
index 98514512e9..da42aaeefc 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
@@ -112,13 +112,7 @@ public class SqlNestedDataBenchmark
     {
       return 1;
     }
-
-    @Override
-    public boolean useParallelMergePoolConfigured()
-    {
-      return true;
-    }
-
+    
     @Override
     public String getFormatString()
     {
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 2945bdb1e5..223048200f 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -54,7 +54,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
@@ -69,7 +69,6 @@ import org.apache.druid.query.movingaverage.test.TestConfig;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.segment.join.MapJoinableFactory;
 import org.apache.druid.server.ClientQuerySegmentWalker;
 import org.apache.druid.server.QueryStackTests;
@@ -363,17 +362,9 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
             return 0L;
           }
         },
-        new DruidProcessingConfig()
-        {
-          @Override
-          public String getFormatString()
-          {
-            return null;
-          }
-        },
+        new BrokerParallelMergeConfig(),
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        new JoinableFactoryWrapper(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())),
         new NoopServiceEmitter()
     );
 
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index a146e090b5..e9c6b835fd 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -55,8 +55,8 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index 9337175378..5d84eeed1f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -33,9 +33,9 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 9ff0c0bd78..f8feeae9be 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -30,10 +30,10 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 5f1311085c..2ff8c5bf91 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -80,6 +80,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
@@ -89,7 +90,6 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.Action;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index eb9715f9c9..4aa60ea032 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -36,9 +36,9 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index e489061408..127683f8e4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -76,6 +76,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -85,7 +86,6 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.Action;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index a72ba05eac..f8b41fce61 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4077,13 +4077,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     reportingExec.scheduleAtFixedRate(
         this::emitLag,
         ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
-        spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+        spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
         TimeUnit.MILLISECONDS
     );
     reportingExec.scheduleAtFixedRate(
         this::emitNoticesQueueSize,
         ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
-        spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+        spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
         TimeUnit.MILLISECONDS
     );
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 90b0b70563..b7d4ba2f27 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -35,9 +35,9 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFac
 import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 
 import javax.annotation.Nullable;
 import java.util.List;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 05e6401a8f..f0b17657d8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -55,13 +55,13 @@ import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.joda.time.DateTime;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index c592b9375f..46f86bacb4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -77,7 +78,6 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -102,6 +102,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -501,7 +502,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
       @Override
-      public Duration getEmitterPeriod()
+      public Duration getEmissionDuration()
       {
         return new Period("PT1S").toStandardDuration();
       }
@@ -1062,6 +1063,29 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     Assert.assertEquals(0, emitter.getEvents().size());
   }
 
+  @Test
+  public void testScheduleReporting()
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig();
+    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(2);
+    ScheduledExecutorService executorService = EasyMock.createMock(ScheduledExecutorService.class);
+    EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once();
+    EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2);
+
+    EasyMock.replay(executorService, spec);
+    final BaseTestSeekableStreamSupervisor supervisor = new BaseTestSeekableStreamSupervisor()
+    {
+      @Override
+      public LagStats computeLagStats()
+      {
+        return new LagStats(0, 0, 0);
+      }
+    };
+    supervisor.scheduleReporting(executorService);
+    EasyMock.verify(executorService, spec);
+  }
+
   private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
   {
     List<Event> result = events.stream()
@@ -1098,7 +1122,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
       @Override
-      public Duration getEmitterPeriod()
+      public Duration getEmissionDuration()
       {
         return new Period("PT1S").toStandardDuration();
       }
@@ -1608,13 +1632,13 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
       reportingExec.scheduleAtFixedRate(
           this::emitLag,
           ioConfig.getStartDelay().getMillis(),
-          spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+          spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
           TimeUnit.MILLISECONDS
       );
       reportingExec.scheduleAtFixedRate(
           this::emitNoticesQueueSize,
           ioConfig.getStartDelay().getMillis(),
-          spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
+          spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
           TimeUnit.MILLISECONDS
       );
     }
diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
index 7a2eae93e1..7931115c6e 100644
--- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
+++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/Initializer.java
@@ -35,9 +35,9 @@ import org.apache.druid.curator.discovery.DiscoveryModule;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.AnnouncerModule;
-import org.apache.druid.guice.DruidProcessingConfigModule;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.PolyBind;
 import org.apache.druid.guice.SQLMetadataStorageDruidModule;
@@ -496,7 +496,7 @@ public class Initializer
             new AnnouncerModule(),
             new DiscoveryModule(),
             // Dependencies from other modules
-            new DruidProcessingConfigModule(),
+            new LegacyBrokerParallelMergeConfigModule(),
             // Dependencies from other modules
             new StorageNodeModule(),
 
diff --git a/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java b/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java
index 538f2a6137..66cc6261c9 100644
--- a/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java
+++ b/processing/src/main/java/org/apache/druid/guice/ConfigProvider.java
@@ -31,6 +31,7 @@ import java.util.Map;
 /**
  *
  */
+@Deprecated
 public class ConfigProvider<T> implements Provider<T>
 {
   private static final Logger log = new Logger(ConfigProvider.class);
@@ -40,11 +41,6 @@ public class ConfigProvider<T> implements Provider<T>
     binder.bind(clazz).toProvider(of(clazz)).in(LazySingleton.class);
   }
 
-  public static <T> void bind(Binder binder, Class<T> clazz, Map<String, String> replacements)
-  {
-    binder.bind(clazz).toProvider(of(clazz, replacements)).in(LazySingleton.class);
-  }
-
   public static <T> Provider<T> of(Class<T> clazz)
   {
     return of(clazz, null);
diff --git a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java
deleted file mode 100644
index 6d6327fbdd..0000000000
--- a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ExecutorServiceConfig.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.java.util.common.concurrent;
-
-import org.apache.druid.utils.JvmUtils;
-import org.skife.config.Config;
-import org.skife.config.Default;
-
-/**
- */
-public abstract class ExecutorServiceConfig
-{
-  public static final int DEFAULT_NUM_THREADS = -1;
-
-  @Config(value = "${base_path}.formatString")
-  @Default("processing-%s")
-  public abstract String getFormatString();
-
-  public int getNumThreads()
-  {
-    int numThreadsConfigured = getNumThreadsConfigured();
-    if (numThreadsConfigured != DEFAULT_NUM_THREADS) {
-      return numThreadsConfigured;
-    } else {
-      return Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1);
-    }
-  }
-
-  /**
-   * Returns the number of threads _explicitly_ configured, or -1 if it is not explicitly configured, that is not
-   * a valid number of threads. To get the configured value or the default (valid) number, use {@link #getNumThreads()}.
-   * This method exists for ability to distinguish between the default value set when there is no explicit config, and
-   * an explicitly configured value.
-   */
-  @Config(value = "${base_path}.numThreads")
-  public int getNumThreadsConfigured()
-  {
-    return DEFAULT_NUM_THREADS;
-  }
-}
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java b/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java
index a0a9d7f645..1ccb910562 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollector.java
@@ -52,7 +52,7 @@ class AllocationMetricCollector
    * Tests show the call to getThreadAllocatedBytes for a single thread ID out of 500 threads running takes around
    * 9000 ns (in the worst case), which for 500 IDs should take 500*9000/1000/1000 = 4.5 ms to the max.
    * AllocationMetricCollector takes linear time to calculate delta, for 500 threads it's negligible.
-   * See the default emitting period {@link MonitorSchedulerConfig#getEmitterPeriod}.
+   * See the default emitting period {@link DruidMonitorSchedulerConfig#getEmissionDuration}.
    *
    * @return all threads summed allocated bytes delta
    */
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
index 0d5c07d49b..c57bbc0dd1 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java
@@ -34,7 +34,7 @@ public class BasicMonitorScheduler extends MonitorScheduler
   private final ScheduledExecutorService exec;
 
   public BasicMonitorScheduler(
-      MonitorSchedulerConfig config,
+      DruidMonitorSchedulerConfig config,
       ServiceEmitter emitter,
       List<Monitor> monitors,
       ScheduledExecutorService exec
@@ -50,7 +50,7 @@ public class BasicMonitorScheduler extends MonitorScheduler
     monitor.start();
     ScheduledExecutors.scheduleAtFixedRate(
         exec,
-        getConfig().getEmitterPeriod(),
+        getConfig().getEmissionDuration(),
         () -> {
           if (hasMonitor(monitor) && monitor.monitor(getEmitter())) {
             return Signal.REPEAT;
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
index 6fcb244069..68b7bacedd 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java
@@ -41,7 +41,7 @@ public class ClockDriftSafeMonitorScheduler extends MonitorScheduler
   private final ExecutorService monitorRunner;
 
   public ClockDriftSafeMonitorScheduler(
-      MonitorSchedulerConfig config,
+      DruidMonitorSchedulerConfig config,
       ServiceEmitter emitter,
       List<Monitor> monitors,
       CronScheduler monitorScheduler,
@@ -57,7 +57,7 @@ public class ClockDriftSafeMonitorScheduler extends MonitorScheduler
   void startMonitor(final Monitor monitor)
   {
     monitor.start();
-    long rate = getConfig().getEmitterPeriod().getMillis();
+    long rate = getConfig().getEmissionDuration().getMillis();
     final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
     Future<?> future = monitorScheduler.scheduleAtFixedRate(
         rate,
diff --git a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java b/processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java
similarity index 76%
rename from server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
rename to processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java
index 0e242b1244..96a554e6ef 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/DruidMonitorSchedulerConfig.java
@@ -17,17 +17,15 @@
  * under the License.
  */
 
-package org.apache.druid.server.metrics;
+package org.apache.druid.java.util.metrics;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
-import org.apache.druid.java.util.metrics.MonitorSchedulerConfig;
 import org.joda.time.Duration;
 import org.joda.time.Period;
 
 /**
  */
-public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig
+public class DruidMonitorSchedulerConfig
 {
   @JsonProperty
   private String schedulerClassName = BasicMonitorScheduler.class.getName();
@@ -40,14 +38,7 @@ public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig
     return schedulerClassName;
   }
 
-  @JsonProperty
-  public Period getEmissionPeriod()
-  {
-    return emissionPeriod;
-  }
-
-  @Override
-  public Duration getEmitterPeriod()
+  public Duration getEmissionDuration()
   {
     return emissionPeriod.toStandardDuration();
   }
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 9e159182a1..47bb37d161 100644
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++ b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -39,7 +39,7 @@ public abstract class MonitorScheduler
 {
   private static final Logger log = new Logger(MonitorScheduler.class);
 
-  private final MonitorSchedulerConfig config;
+  private final DruidMonitorSchedulerConfig config;
   private final ServiceEmitter emitter;
   private final Set<Monitor> monitors;
   private final Object lock = new Object();
@@ -47,7 +47,7 @@ public abstract class MonitorScheduler
   private volatile boolean started = false;
 
   MonitorScheduler(
-      MonitorSchedulerConfig config,
+      DruidMonitorSchedulerConfig config,
       ServiceEmitter emitter,
       List<Monitor> monitors
   )
@@ -135,7 +135,7 @@ public abstract class MonitorScheduler
     }
   }
 
-  MonitorSchedulerConfig getConfig()
+  DruidMonitorSchedulerConfig getConfig()
   {
     return config;
   }
diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java
deleted file mode 100644
index 8d0e165e2a..0000000000
--- a/processing/src/main/java/org/apache/druid/java/util/metrics/MonitorSchedulerConfig.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.java.util.metrics;
-
-import org.joda.time.Duration;
-import org.skife.config.Config;
-import org.skife.config.Default;
-
-/**
- */
-public abstract class MonitorSchedulerConfig
-{
-  @Config({"org.apache.druid.java.util.metrics.emitter.period", "com.metamx.druid.emitter.period"})
-  @Default("PT60s")
-  public abstract Duration getEmitterPeriod();
-}
diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java
new file mode 100644
index 0000000000..ac6b63c8ad
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingBufferConfig.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+
+import javax.annotation.Nullable;
+
+public class DruidProcessingBufferConfig
+{
+  public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1);
+  public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024;
+  public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0;
+
+  @JsonProperty
+  private final HumanReadableBytes sizeBytes;
+
+  @JsonProperty
+  private final int poolCacheMaxCount;
+
+  @JsonProperty
+  private final int poolCacheInitialCount;
+
+  @JsonCreator
+  public DruidProcessingBufferConfig(
+      @JsonProperty("sizeBytes") @Nullable HumanReadableBytes sizeBytes,
+      @JsonProperty("poolCacheMaxCount") @Nullable Integer poolCacheMaxCount,
+      @JsonProperty("poolCacheInitialCount") @Nullable Integer poolCacheInitialCount
+  )
+  {
+    this.sizeBytes = Configs.valueOrDefault(sizeBytes, DEFAULT_PROCESSING_BUFFER_SIZE_BYTES);
+    this.poolCacheInitialCount = Configs.valueOrDefault(
+        poolCacheInitialCount,
+        DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL
+    );
+    this.poolCacheMaxCount = Configs.valueOrDefault(poolCacheMaxCount, Integer.MAX_VALUE);
+  }
+
+  public DruidProcessingBufferConfig()
+  {
+    this(null, null, null);
+  }
+
+  public HumanReadableBytes getBufferSize()
+  {
+    return sizeBytes;
+  }
+
+  public int getPoolCacheMaxCount()
+  {
+    return poolCacheMaxCount;
+  }
+
+  public int getPoolCacheInitialCount()
+  {
+    return poolCacheInitialCount;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
index 823c9e71ef..ca645d16e9 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
@@ -19,45 +19,82 @@
 
 package org.apache.druid.query;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
-import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.utils.JvmUtils;
-import org.skife.config.Config;
 
+import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicReference;
 
-public abstract class DruidProcessingConfig extends ExecutorServiceConfig implements ColumnConfig
+public class DruidProcessingConfig implements ColumnConfig
 {
   private static final Logger log = new Logger(DruidProcessingConfig.class);
 
-  public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
-  public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1);
-  public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024;
-  public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000;
-  public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0;
-
-  private AtomicReference<Integer> computedBufferSizeBytes = new AtomicReference<>();
-
-  @Config({"druid.computation.buffer.size", "${base_path}.buffer.sizeBytes"})
-  public HumanReadableBytes intermediateComputeSizeBytesConfigured()
-  {
-    return DEFAULT_PROCESSING_BUFFER_SIZE_BYTES;
-  }
-
-  public int intermediateComputeSizeBytes()
-  {
-    HumanReadableBytes sizeBytesConfigured = intermediateComputeSizeBytesConfigured();
-    if (!DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured)) {
+  @JsonProperty
+  private final String formatString;
+  @JsonProperty
+  private final int numThreads;
+  @JsonProperty
+  private final int numMergeBuffers;
+  @JsonProperty
+  private final boolean fifo;
+  @JsonProperty
+  private final String tmpDir;
+  @JsonProperty
+  private final DruidProcessingBufferConfig buffer;
+  @JsonProperty
+  private final DruidProcessingIndexesConfig indexes;
+  private final AtomicReference<Integer> computedBufferSizeBytes = new AtomicReference<>();
+  private final boolean numThreadsConfigured;
+  private final boolean numMergeBuffersConfigured;
+
+  @JsonCreator
+  public DruidProcessingConfig(
+      @JsonProperty("formatString") @Nullable String formatString,
+      @JsonProperty("numThreads") @Nullable Integer numThreads,
+      @JsonProperty("numMergeBuffers") @Nullable Integer numMergeBuffers,
+      @JsonProperty("fifo") @Nullable Boolean fifo,
+      @JsonProperty("tmpDir") @Nullable String tmpDir,
+      @JsonProperty("buffer") DruidProcessingBufferConfig buffer,
+      @JsonProperty("indexes") DruidProcessingIndexesConfig indexes
+  )
+  {
+    this.formatString = Configs.valueOrDefault(formatString, "processing-%s");
+    this.numThreads = Configs.valueOrDefault(
+        numThreads,
+        Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1, 1)
+    );
+    this.numMergeBuffers = Configs.valueOrDefault(numMergeBuffers, Math.max(2, this.numThreads / 4));
+    this.fifo = fifo == null || fifo;
+    this.tmpDir = Configs.valueOrDefault(tmpDir, System.getProperty("java.io.tmpdir"));
+    this.buffer = Configs.valueOrDefault(buffer, new DruidProcessingBufferConfig());
+    this.indexes = Configs.valueOrDefault(indexes, new DruidProcessingIndexesConfig());
+
+    this.numThreadsConfigured = numThreads != null;
+    this.numMergeBuffersConfigured = numMergeBuffers != null;
+    initializeBufferSize();
+  }
+
+  @VisibleForTesting
+  public DruidProcessingConfig()
+  {
+    this(null, null, null, null, null, null, null);
+  }
+
+  private void initializeBufferSize()
+  {
+    HumanReadableBytes sizeBytesConfigured = this.buffer.getBufferSize();
+    if (!DruidProcessingBufferConfig.DEFAULT_PROCESSING_BUFFER_SIZE_BYTES.equals(sizeBytesConfigured)) {
       if (sizeBytesConfigured.getBytes() > Integer.MAX_VALUE) {
         throw new IAE("druid.processing.buffer.sizeBytes must be less than 2GiB");
       }
-      return sizeBytesConfigured.getBytesInInt();
-    } else if (computedBufferSizeBytes.get() != null) {
-      return computedBufferSizeBytes.get();
+      computedBufferSizeBytes.set(sizeBytesConfigured.getBytesInInt());
     }
 
     long directSizeBytes;
@@ -70,173 +107,90 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
     }
     catch (UnsupportedOperationException e) {
       // max direct memory defaults to max heap size on recent JDK version, unless set explicitly
-      directSizeBytes = computeMaxMemoryFromMaxHeapSize();
+      directSizeBytes = Runtime.getRuntime().maxMemory() / 4;
       log.info("Using up to [%,d] bytes of direct memory for computation buffers.", directSizeBytes);
     }
 
-    int numProcessingThreads = getNumThreads();
-    int numMergeBuffers = getNumMergeBuffers();
-    int totalNumBuffers = numMergeBuffers + numProcessingThreads;
+    int totalNumBuffers = this.numMergeBuffers + this.numThreads;
     int sizePerBuffer = (int) ((double) directSizeBytes / (double) (totalNumBuffers + 1));
 
-    final int computedSizePerBuffer = Math.min(sizePerBuffer, MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES);
+    final int computedSizePerBuffer = Math.min(
+        sizePerBuffer,
+        DruidProcessingBufferConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES
+    );
     if (computedBufferSizeBytes.compareAndSet(null, computedSizePerBuffer)) {
       log.info(
           "Auto sizing buffers to [%,d] bytes each for [%,d] processing and [%,d] merge buffers. "
           + "If you run out of direct memory, you may need to set these parameters explicitly using the guidelines at "
           + "https://druid.apache.org/docs/latest/operations/basic-cluster-tuning.html#processing-threads-buffers.",
           computedSizePerBuffer,
-          numProcessingThreads,
-          numMergeBuffers
+          this.numThreads,
+          this.numMergeBuffers
       );
     }
-    return computedSizePerBuffer;
-  }
-
-  public static long computeMaxMemoryFromMaxHeapSize()
-  {
-    return Runtime.getRuntime().maxMemory() / 4;
   }
 
-  @Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"})
-  public int poolCacheMaxCount()
+  public String getFormatString()
   {
-    return Integer.MAX_VALUE;
+    return formatString;
   }
 
-  @Config({
-      "druid.computation.buffer.poolCacheInitialCount",
-      "${base_path}.buffer.poolCacheInitialCount"
-  })
-  public int getNumInitalBuffersForIntermediatePool()
+  public int getNumThreads()
   {
-    return DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL;
-  }
-
-  @Override
-  @Config(value = "${base_path}.numThreads")
-  public int getNumThreadsConfigured()
-  {
-    return DEFAULT_NUM_THREADS;
+    return numThreads;
   }
 
   public int getNumMergeBuffers()
   {
-    int numMergeBuffersConfigured = getNumMergeBuffersConfigured();
-    if (numMergeBuffersConfigured != DEFAULT_NUM_MERGE_BUFFERS) {
-      return numMergeBuffersConfigured;
-    } else {
-      return Math.max(2, getNumThreads() / 4);
-    }
-  }
-
-  /**
-   * Returns the number of merge buffers _explicitly_ configured, or -1 if it is not explicitly configured, that is not
-   * a valid number of buffers. To get the configured value or the default (valid) number, use {@link
-   * #getNumMergeBuffers()}. This method exists for ability to distinguish between the default value set when there is
-   * no explicit config, and an explicitly configured value.
-   */
-  @Config("${base_path}.numMergeBuffers")
-  public int getNumMergeBuffersConfigured()
-  {
-    return DEFAULT_NUM_MERGE_BUFFERS;
-  }
-
-  @Override
-  @Config(value = "${base_path}.indexes.skipValueRangeIndexScale")
-  public double skipValueRangeIndexScale()
-  {
-    return ColumnConfig.super.skipValueRangeIndexScale();
-  }
-
-  @Override
-  @Config(value = "${base_path}.indexes.skipValuePredicateIndexScale")
-  public double skipValuePredicateIndexScale()
-  {
-    return ColumnConfig.super.skipValuePredicateIndexScale();
+    return numMergeBuffers;
   }
 
-  @Config(value = "${base_path}.fifo")
   public boolean isFifo()
   {
-    return true;
+    return fifo;
   }
 
-  @Config(value = "${base_path}.tmpDir")
   public String getTmpDir()
   {
-    return System.getProperty("java.io.tmpdir");
+    return tmpDir;
   }
 
-  @Config(value = "${base_path}.merge.useParallelMergePool")
-  public boolean useParallelMergePoolConfigured()
+  public int intermediateComputeSizeBytes()
   {
-    return true;
-  }
 
-  public boolean useParallelMergePool()
-  {
-    final boolean useParallelMergePoolConfigured = useParallelMergePoolConfigured();
-    final int parallelism = getMergePoolParallelism();
-    // need at least 3 to do 2 layer merge
-    if (parallelism > 2) {
-      return useParallelMergePoolConfigured;
-    }
-    if (useParallelMergePoolConfigured) {
-      log.debug(
-          "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s",
-          parallelism
-      );
-    }
-    return false;
-  }
-
-  @Config(value = "${base_path}.merge.pool.parallelism")
-  public int getMergePoolParallelismConfigured()
-  {
-    return DEFAULT_NUM_THREADS;
+    return computedBufferSizeBytes.get();
   }
 
-  public int getMergePoolParallelism()
+  public int poolCacheMaxCount()
   {
-    int poolParallelismConfigured = getMergePoolParallelismConfigured();
-    if (poolParallelismConfigured != DEFAULT_NUM_THREADS) {
-      return poolParallelismConfigured;
-    } else {
-      // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5
-      return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75);
-    }
+    return buffer.getPoolCacheMaxCount();
   }
 
-  @Config(value = "${base_path}.merge.pool.awaitShutdownMillis")
-  public long getMergePoolAwaitShutdownMillis()
+  public int getNumInitalBuffersForIntermediatePool()
   {
-    return DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS;
+    return buffer.getPoolCacheInitialCount();
   }
 
-  @Config(value = "${base_path}.merge.pool.defaultMaxQueryParallelism")
-  public int getMergePoolDefaultMaxQueryParallelism()
+  @Override
+  public double skipValueRangeIndexScale()
   {
-    // assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores
-    return (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1);
+    return indexes.getSkipValueRangeIndexScale();
   }
 
-  @Config(value = "${base_path}.merge.task.targetRunTimeMillis")
-  public int getMergePoolTargetTaskRunTimeMillis()
+  @Override
+  public double skipValuePredicateIndexScale()
   {
-    return ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS;
+    return indexes.getSkipValuePredicateIndexScale();
   }
 
-  @Config(value = "${base_path}.merge.task.initialYieldNumRows")
-  public int getMergePoolTaskInitialYieldRows()
+  public boolean isNumThreadsConfigured()
   {
-    return ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS;
+    return numThreadsConfigured;
   }
 
-  @Config(value = "${base_path}.merge.task.smallBatchNumRows")
-  public int getMergePoolSmallBatchRows()
+  public boolean isNumMergeBuffersConfigured()
   {
-    return ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS;
+    return numMergeBuffersConfigured;
   }
 }
 
diff --git a/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java b/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java
new file mode 100644
index 0000000000..6afa20418e
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingIndexesConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.segment.column.ColumnConfig;
+
+import javax.annotation.Nullable;
+
+public class DruidProcessingIndexesConfig
+{
+  @JsonProperty
+  private final double skipValueRangeIndexScale;
+
+  @JsonProperty
+  private final double skipValuePredicateIndexScale;
+
+  @JsonCreator
+  public DruidProcessingIndexesConfig(
+      @JsonProperty("skipValueRangeIndexScale") @Nullable Double skipValueRangeIndexScale,
+      @JsonProperty("skipValuePredicateIndexScale") @Nullable Double skipValuePredicateIndexScale
+  )
+  {
+    this.skipValueRangeIndexScale = Configs.valueOrDefault(
+        skipValueRangeIndexScale,
+        ColumnConfig.DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE
+    );
+    this.skipValuePredicateIndexScale = Configs.valueOrDefault(
+        skipValuePredicateIndexScale,
+        ColumnConfig.DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE
+    );
+  }
+
+  public DruidProcessingIndexesConfig()
+  {
+    this(null, null);
+  }
+
+  public double getSkipValueRangeIndexScale()
+  {
+    return skipValueRangeIndexScale;
+  }
+
+  public double getSkipValuePredicateIndexScale()
+  {
+    return skipValuePredicateIndexScale;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
index 3a9b8818cd..b61a78a7c9 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java
@@ -159,6 +159,7 @@ public class StringFirstLastUtils
     Long timeValue = copyBuffer.getLong();
     int stringSizeBytes = copyBuffer.getInt();
 
+
     if (stringSizeBytes >= 0) {
       byte[] valueBytes = new byte[stringSizeBytes];
       copyBuffer.get(valueBytes, 0, stringSizeBytes);
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index f2d57a5517..dd0ac9ab11 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -79,6 +79,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -276,6 +277,10 @@ public class IndexIO
             throw notEqualValidationException(dim1Name, vals1, vals2);
           }
         }
+      } else if (vals1 instanceof Object[]) {
+        if (!Arrays.deepEquals((Object[]) vals1, (Object[]) vals2)) {
+          throw notEqualValidationException(dim1Name, vals1, vals2);
+        }
       } else {
         if (!Objects.equals(vals1, vals2)) {
           throw notEqualValidationException(dim1Name, vals1, vals2);
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java
index ae5ca7ff8a..62934d3d34 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnConfig.java
@@ -26,9 +26,16 @@ import org.apache.druid.segment.index.semantic.NumericRangeIndexes;
 
 public interface ColumnConfig
 {
-  ColumnConfig DEFAULT = new ColumnConfig()
-  {
-  };
+  /**
+   * this value was chosen testing bound filters on double columns with a variety of ranges at which this ratio
+   * of number of bitmaps compared to total number of rows appeared to be around the threshold where indexes stopped
+   * performing consistently faster than a full scan + value matcher
+   */
+  double DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE = 0.08;
+
+  double DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE = 0.08;
+
+  ColumnConfig DEFAULT = new ColumnConfig() {};
 
   ColumnConfig ALWAYS_USE_INDEXES = new ColumnConfig()
   {
@@ -73,10 +80,7 @@ public interface ColumnConfig
    */
   default double skipValueRangeIndexScale()
   {
-    // this value was chosen testing bound filters on double columns with a variety of ranges at which this ratio
-    // of number of bitmaps compared to total number of rows appeared to be around the threshold where indexes stopped
-    // performing consistently faster than a full scan + value matcher
-    return 0.08;
+    return DEFAULT_SKIP_VALUE_RANGE_INDEX_SCALE;
   }
 
   /**
@@ -109,6 +113,6 @@ public interface ColumnConfig
    */
   default double skipValuePredicateIndexScale()
   {
-    return 0.08;
+    return DEFAULT_SKIP_VALUE_PREDICATE_INDEX_SCALE;
   }
 }
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
index 9b87e1cd7b..ceac1e5564 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java
@@ -32,10 +32,10 @@ import java.util.concurrent.ScheduledExecutorService;
 
 public class BasicMonitorSchedulerTest
 {
-  private final MonitorSchedulerConfig config = new MonitorSchedulerConfig()
+  private final DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig()
   {
     @Override
-    public Duration getEmitterPeriod()
+    public Duration getEmissionDuration()
     {
       return Duration.millis(5);
     }
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
index d7ec354ff1..712fcbfbb6 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java
@@ -93,7 +93,7 @@ public class ClockDriftSafeMonitorSchedulerTest
     ExecutorService executor = Mockito.mock(ExecutorService.class);
 
     final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
-        Mockito.mock(MonitorSchedulerConfig.class),
+        Mockito.mock(DruidMonitorSchedulerConfig.class),
         Mockito.mock(ServiceEmitter.class),
         ImmutableList.of(monitor1, monitor2),
         CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
@@ -153,8 +153,8 @@ public class ClockDriftSafeMonitorSchedulerTest
 
     Monitor monitor = Mockito.mock(Monitor.class);
 
-    MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
-    Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+    DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class);
+    Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L));
 
     final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
         config,
@@ -217,8 +217,8 @@ public class ClockDriftSafeMonitorSchedulerTest
 
     Monitor monitor = Mockito.mock(Monitor.class);
 
-    MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
-    Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+    DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class);
+    Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L));
 
     final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
         config,
@@ -248,8 +248,8 @@ public class ClockDriftSafeMonitorSchedulerTest
     Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class)))
            .thenThrow(new RuntimeException("Test throwing exception while monitoring"));
 
-    MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
-    Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+    DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class);
+    Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L));
 
     CountDownLatch latch = new CountDownLatch(1);
     AtomicBoolean monitorResultHolder = new AtomicBoolean(false);
@@ -310,8 +310,8 @@ public class ClockDriftSafeMonitorSchedulerTest
   {
     ExecutorService executor = Mockito.mock(ExecutorService.class);
     Monitor monitor = Mockito.mock(Monitor.class);
-    MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
-    Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+    DruidMonitorSchedulerConfig config = Mockito.mock(DruidMonitorSchedulerConfig.class);
+    Mockito.when(config.getEmissionDuration()).thenReturn(new org.joda.time.Duration(1000L));
 
     CountDownLatch latch = new CountDownLatch(1);
     Mockito.doAnswer(new Answer<Future<?>>()
diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
index 40afa8349b..9f9a67270b 100644
--- a/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
@@ -34,10 +34,10 @@ public class MonitorSchedulerTest
   @Test
   public void testMonitorAndStopOnRemove() throws IOException
   {
-    MonitorSchedulerConfig infiniteFlushDelayConfig = new MonitorSchedulerConfig()
+    DruidMonitorSchedulerConfig infiniteFlushDelayConfig = new DruidMonitorSchedulerConfig()
     {
       @Override
-      public Duration getEmitterPeriod()
+      public Duration getEmissionDuration()
       {
         return Duration.millis(Long.MAX_VALUE);
       }
diff --git a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
index 8d9f713728..e19fbf7554 100644
--- a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
@@ -19,22 +19,18 @@
 
 package org.apache.druid.query;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Guice;
 import com.google.inject.Injector;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.config.Config;
+import com.google.inject.ProvisionException;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.StartupInjectorBuilder;
 import org.apache.druid.utils.JvmUtils;
 import org.apache.druid.utils.RuntimeInfo;
-import org.hamcrest.CoreMatchers;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.skife.config.ConfigurationObjectFactory;
 
-import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -46,47 +42,15 @@ public class DruidProcessingConfigTest
   private static final long DIRECT_SIZE = BUFFER_SIZE * (3L + 2L + 1L);
   private static final long HEAP_SIZE = BUFFER_SIZE * 2L;
 
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  private static Injector makeInjector(int numProcessors, long directMemorySize, long heapSize)
-  {
-    return makeInjector(numProcessors, directMemorySize, heapSize, new Properties(), null);
-  }
-
   @AfterClass
   public static void teardown()
   {
     JvmUtils.resetTestsToDefaultRuntimeInfo();
   }
 
-  private static Injector makeInjector(
-      int numProcessors,
-      long directMemorySize,
-      long heapSize,
-      Properties props,
-      Map<String, String> replacements
-  )
-  {
-    return Guice.createInjector(
-        binder -> {
-          binder.bind(RuntimeInfo.class).toInstance(new MockRuntimeInfo(numProcessors, directMemorySize, heapSize));
-          binder.requestStaticInjection(JvmUtils.class);
-          ConfigurationObjectFactory factory = Config.createFactory(props);
-          DruidProcessingConfig config;
-          if (replacements != null) {
-            config = factory.buildWithReplacements(
-                DruidProcessingConfig.class,
-                replacements
-            );
-          } else {
-            config = factory.build(DruidProcessingConfig.class);
-          }
-          binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
-          binder.bind(DruidProcessingConfig.class).toInstance(config);
-        }
-    );
-  }
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
 
   @Test
   public void testDefaultsMultiProcessor()
@@ -124,7 +88,7 @@ public class DruidProcessingConfigTest
     DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class);
 
     Assert.assertEquals(
-        DruidProcessingConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES,
+        DruidProcessingBufferConfig.MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES,
         config.intermediateComputeSizeBytes()
     );
   }
@@ -144,8 +108,7 @@ public class DruidProcessingConfigTest
         NUM_PROCESSORS,
         DIRECT_SIZE,
         HEAP_SIZE,
-        props,
-        ImmutableMap.of("base_path", "druid.processing")
+        props
     );
     DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class);
 
@@ -164,14 +127,19 @@ public class DruidProcessingConfigTest
     Properties props = new Properties();
     props.setProperty("druid.processing.buffer.sizeBytes", "-1");
 
-    expectedException.expectCause(CoreMatchers.isA(IAE.class));
-
     Injector injector = makeInjector(
         NUM_PROCESSORS,
         DIRECT_SIZE,
         HEAP_SIZE,
-        props,
-        ImmutableMap.of("base_path", "druid.processing")
+        props
+    );
+    Throwable t = Assert.assertThrows(
+        ProvisionException.class,
+        () -> injector.getInstance(DruidProcessingConfig.class)
+    );
+    Assert.assertTrue(
+        t.getMessage()
+         .contains("Cannot construct instance of `org.apache.druid.java.util.common.HumanReadableBytes`, problem: Invalid format of number: -1. Negative value is not allowed.")
     );
   }
 
@@ -184,13 +152,35 @@ public class DruidProcessingConfigTest
         NUM_PROCESSORS,
         DIRECT_SIZE,
         HEAP_SIZE,
-        props,
-        ImmutableMap.of("base_path", "druid.processing")
+        props
+    );
+    Throwable t = Assert.assertThrows(
+        ProvisionException.class,
+        () -> injector.getInstance(DruidProcessingConfig.class)
     );
-    DruidProcessingConfig config = injector.getInstance(DruidProcessingConfig.class);
 
-    expectedException.expectMessage("druid.processing.buffer.sizeBytes must be less than 2GiB");
-    config.intermediateComputeSizeBytes();
+    Assert.assertTrue(t.getMessage().contains("druid.processing.buffer.sizeBytes must be less than 2GiB"));
+  }
+
+  private static Injector makeInjector(int numProcessors, long directMemorySize, long heapSize)
+  {
+    return makeInjector(numProcessors, directMemorySize, heapSize, new Properties());
+  }
+  private static Injector makeInjector(
+      int numProcessors,
+      long directMemorySize,
+      long heapSize,
+      Properties props
+  )
+  {
+    Injector injector = new StartupInjectorBuilder().withProperties(props).add(
+        binder -> {
+          binder.bind(RuntimeInfo.class).toInstance(new MockRuntimeInfo(numProcessors, directMemorySize, heapSize));
+          binder.requestStaticInjection(JvmUtils.class);
+          JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
+        }
+    ).build();
+    return injector;
   }
 
   public static class MockRuntimeInfo extends RuntimeInfo
diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 5c0e5efb7e..19df276344 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -55,9 +55,9 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.BySegmentResultValueClass;
 import org.apache.druid.query.CacheStrategy;
-import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Queries;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContext;
@@ -75,7 +75,6 @@ import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.filter.DimFilterUtils;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.spec.QuerySegmentSpec;
-import org.apache.druid.segment.join.JoinableFactoryWrapper;
 import org.apache.druid.server.QueryResource;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.coordination.DruidServerMetadata;
@@ -124,10 +123,9 @@ public class CachingClusteredClient implements QuerySegmentWalker
   private final CachePopulator cachePopulator;
   private final CacheConfig cacheConfig;
   private final DruidHttpClientConfig httpClientConfig;
-  private final DruidProcessingConfig processingConfig;
+  private final BrokerParallelMergeConfig parallelMergeConfig;
   private final ForkJoinPool pool;
   private final QueryScheduler scheduler;
-  private final JoinableFactoryWrapper joinableFactoryWrapper;
   private final ServiceEmitter emitter;
 
   @Inject
@@ -139,10 +137,9 @@ public class CachingClusteredClient implements QuerySegmentWalker
       CachePopulator cachePopulator,
       CacheConfig cacheConfig,
       @Client DruidHttpClientConfig httpClientConfig,
-      DruidProcessingConfig processingConfig,
+      BrokerParallelMergeConfig parallelMergeConfig,
       @Merging ForkJoinPool pool,
       QueryScheduler scheduler,
-      JoinableFactoryWrapper joinableFactoryWrapper,
       ServiceEmitter emitter
   )
   {
@@ -153,10 +150,9 @@ public class CachingClusteredClient implements QuerySegmentWalker
     this.cachePopulator = cachePopulator;
     this.cacheConfig = cacheConfig;
     this.httpClientConfig = httpClientConfig;
-    this.processingConfig = processingConfig;
+    this.parallelMergeConfig = parallelMergeConfig;
     this.pool = pool;
     this.scheduler = scheduler;
-    this.joinableFactoryWrapper = joinableFactoryWrapper;
     this.emitter = emitter;
 
     if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
@@ -386,7 +382,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
     {
       BinaryOperator<T> mergeFn = toolChest.createMergeFn(query);
       final QueryContext queryContext = query.context();
-      if (processingConfig.useParallelMergePool() && queryContext.getEnableParallelMerges() && mergeFn != null) {
+      if (parallelMergeConfig.useParallelMergePool() && queryContext.getEnableParallelMerges() && mergeFn != null) {
         return new ParallelMergeCombiningSequence<>(
             pool,
             sequencesByInterval,
@@ -395,10 +391,10 @@ public class CachingClusteredClient implements QuerySegmentWalker
             queryContext.hasTimeout(),
             queryContext.getTimeout(),
             queryContext.getPriority(),
-            queryContext.getParallelMergeParallelism(processingConfig.getMergePoolDefaultMaxQueryParallelism()),
-            queryContext.getParallelMergeInitialYieldRows(processingConfig.getMergePoolTaskInitialYieldRows()),
-            queryContext.getParallelMergeSmallBatchRows(processingConfig.getMergePoolSmallBatchRows()),
-            processingConfig.getMergePoolTargetTaskRunTimeMillis(),
+            queryContext.getParallelMergeParallelism(parallelMergeConfig.getDefaultMaxQueryParallelism()),
+            queryContext.getParallelMergeInitialYieldRows(parallelMergeConfig.getInitialYieldNumRows()),
+            queryContext.getParallelMergeSmallBatchRows(parallelMergeConfig.getSmallBatchNumRows()),
+            parallelMergeConfig.getTargetRunTimeMillis(),
             reportMetrics -> {
               QueryMetrics<?> queryMetrics = queryPlus.getQueryMetrics();
               if (queryMetrics != null) {
diff --git a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index d5f18dd178..77572bae47 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -39,13 +39,14 @@ import org.apache.druid.guice.annotations.Merging;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.offheap.OffheapBufferGenerator;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.ExecutorServiceMonitor;
 import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.server.metrics.MetricsModule;
 import org.apache.druid.utils.JvmUtils;
 
@@ -67,7 +68,9 @@ public class BrokerProcessingModule implements Module
   @Override
   public void configure(Binder binder)
   {
-    binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
+    JsonConfigProvider.bind(binder, "druid.processing.merge", BrokerParallelMergeConfig.class);
+    JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
+    binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
     MetricsModule.register(binder, ExecutorServiceMonitor.class);
   }
 
@@ -133,14 +136,14 @@ public class BrokerProcessingModule implements Module
 
   @Provides
   @ManageLifecycle
-  public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config)
+  public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(BrokerParallelMergeConfig config)
   {
     return new LifecycleForkJoinPoolProvider(
-        config.getMergePoolParallelism(),
+        config.getParallelism(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t),
         true,
-        config.getMergePoolAwaitShutdownMillis()
+        config.getAwaitShutdownMillis()
     );
   }
 
diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index b02ae366f5..56a0fd0ede 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -38,7 +38,6 @@ import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.annotations.Merging;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.offheap.OffheapBufferGenerator;
@@ -47,13 +46,13 @@ import org.apache.druid.query.ExecutorServiceMonitor;
 import org.apache.druid.query.MetricsEmittingQueryProcessingPool;
 import org.apache.druid.query.PrioritizedExecutorService;
 import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.server.metrics.MetricsModule;
 import org.apache.druid.utils.JvmUtils;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ForkJoinPool;
 
 /**
  */
@@ -64,7 +63,8 @@ public class DruidProcessingModule implements Module
   @Override
   public void configure(Binder binder)
   {
-    binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
+    JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
+    binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
     MetricsModule.register(binder, ExecutorServiceMonitor.class);
   }
 
@@ -135,26 +135,6 @@ public class DruidProcessingModule implements Module
     );
   }
 
-  @Provides
-  @ManageLifecycle
-  public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config)
-  {
-    return new LifecycleForkJoinPoolProvider(
-        config.getMergePoolParallelism(),
-        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
-        (t, e) -> log.error(e, "Unhandled exception in thread [%s]", t),
-        true,
-        config.getMergePoolAwaitShutdownMillis()
-    );
-  }
-
-  @Provides
-  @Merging
-  public ForkJoinPool getMergeProcessingPool(LifecycleForkJoinPoolProvider poolProvider)
-  {
-    return poolProvider.getPool();
-  }
-
   private void verifyDirectMemory(DruidProcessingConfig config)
   {
     try {
diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java b/server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java
similarity index 58%
rename from server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java
rename to server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java
index 7e83817de2..3d44cf23f7 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingConfigModule.java
+++ b/server/src/main/java/org/apache/druid/guice/LegacyBrokerParallelMergeConfigModule.java
@@ -19,17 +19,24 @@
 
 package org.apache.druid.guice;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.inject.Binder;
 import com.google.inject.Module;
-import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.LegacyBrokerParallelMergeConfig;
 
-public class DruidProcessingConfigModule implements Module
+/**
+ * Backwards compatibility for runtime.properties for Druid 27 and older to make deprecated config paths of
+ * {@link LegacyBrokerParallelMergeConfig} still work for Druid 28.
+ * {@link org.apache.druid.query.BrokerParallelMergeConfig} has replaced these configs, and will warn when these
+ * deprecated paths are configured. This module should be removed in Druid 29, along with
+ * {@link LegacyBrokerParallelMergeConfig} as well as the config-magic library that makes it work.
+ */
+@Deprecated
+public class LegacyBrokerParallelMergeConfigModule implements Module
 {
 
   @Override
   public void configure(Binder binder)
   {
-    ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing"));
+    ConfigProvider.bind(binder, LegacyBrokerParallelMergeConfig.class);
   }
 }
diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
index 59af28a4f5..3b68289c6d 100644
--- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
@@ -29,12 +29,12 @@ import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.annotations.Merging;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.ExecutorServiceMonitor;
 import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.server.metrics.MetricsModule;
 
 import java.nio.ByteBuffer;
@@ -53,7 +53,8 @@ public class RouterProcessingModule implements Module
   @Override
   public void configure(Binder binder)
   {
-    binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
+    JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
+    binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
     MetricsModule.register(binder, ExecutorServiceMonitor.class);
   }
 
@@ -61,8 +62,8 @@ public class RouterProcessingModule implements Module
   @ManageLifecycle
   public QueryProcessingPool getProcessingExecutorPool(DruidProcessingConfig config)
   {
-    if (config.getNumThreadsConfigured() != ExecutorServiceConfig.DEFAULT_NUM_THREADS) {
-      log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreadsConfigured());
+    if (config.isNumThreadsConfigured()) {
+      log.warn("numThreads[%d] configured, that is ignored on Router", config.getNumThreads());
     }
     return new ForwardingQueryProcessingPool(Execs.dummy());
   }
@@ -80,10 +81,10 @@ public class RouterProcessingModule implements Module
   @Merging
   public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig config)
   {
-    if (config.getNumMergeBuffersConfigured() != DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS) {
-      log.error(
+    if (config.isNumMergeBuffersConfigured()) {
+      log.warn(
           "numMergeBuffers[%d] configured, that is ignored on Router",
-          config.getNumMergeBuffersConfigured()
+          config.getNumMergeBuffers()
       );
     }
     return DummyBlockingPool.instance();
diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
index 842abd0e53..cc4e0b1e33 100644
--- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
+++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
@@ -56,7 +56,6 @@ public class StorageNodeModule implements Module
     JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
     JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
     bindLocationSelectorStrategy(binder);
-
     binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null));
     binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
   }
diff --git a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
index de4f3abbc8..d27fdf75b1 100644
--- a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
+++ b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
@@ -26,7 +26,6 @@ import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.AnnouncerModule;
 import org.apache.druid.guice.CoordinatorDiscoveryModule;
 import org.apache.druid.guice.DruidInjectorBuilder;
-import org.apache.druid.guice.DruidProcessingConfigModule;
 import org.apache.druid.guice.DruidSecondaryModule;
 import org.apache.druid.guice.ExpressionModule;
 import org.apache.druid.guice.ExtensionsModule;
@@ -112,7 +111,6 @@ public class CoreInjectorBuilder extends DruidInjectorBuilder
         new MetricsModule(),
         new SegmentWriteOutMediumModule(),
         new ServerModule(),
-        new DruidProcessingConfigModule(),
         new StorageNodeModule(),
         new JettyServerModule(),
         new ExpressionModule(),
diff --git a/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java b/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java
new file mode 100644
index 0000000000..1f8d25a10c
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/query/BrokerParallelMergeConfig.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+
+public class BrokerParallelMergeConfig
+{
+  private static final Logger LOG = new Logger(BrokerParallelMergeConfig.class);
+  public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000;
+
+  @JsonProperty
+  private final boolean useParallelMergePool;
+  @JsonProperty
+  private final int parallelism;
+  @JsonProperty
+  private final long awaitShutdownMillis;
+  @JsonProperty
+  private final int defaultMaxQueryParallelism;
+  @JsonProperty
+  private final int targetRunTimeMillis;
+  @JsonProperty
+  private final int initialYieldNumRows;
+  @JsonProperty
+  private final int smallBatchNumRows;
+
+  @JsonCreator
+  public BrokerParallelMergeConfig(
+      @JsonProperty("useParallelMergePool") @Nullable Boolean useParallelMergePool,
+      @JsonProperty("parallelism") @Nullable Integer parallelism,
+      @JsonProperty("awaitShutdownMillis") @Nullable Long awaitShutdownMillis,
+      @JsonProperty("defaultMaxQueryParallelism") @Nullable Integer defaultMaxQueryParallelism,
+      @JsonProperty("targetRunTimeMillis") @Nullable Integer targetRunTimeMillis,
+      @JsonProperty("initialYieldNumRows") @Nullable Integer initialYieldNumRows,
+      @JsonProperty("smallBatchNumRows") @Nullable Integer smallBatchNumRows,
+      @JacksonInject LegacyBrokerParallelMergeConfig oldConfig
+  )
+  {
+    if (parallelism == null) {
+      if (oldConfig == null || oldConfig.getMergePoolParallelism() == null) {
+        // assume 2 hyper-threads per core, so that this value is probably by default the number
+        // of physical cores * 1.5
+        this.parallelism = (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75);
+      } else {
+        warnDeprecated(
+            "druid.processing.merge.pool.parallelism",
+            "druid.processing.merge.parallelism"
+        );
+        this.parallelism = oldConfig.getMergePoolParallelism();
+      }
+    } else {
+      this.parallelism = parallelism;
+    }
+
+    // need at least 3 to do 2 layer merge
+    if (this.parallelism > 2) {
+      this.useParallelMergePool = useParallelMergePool == null || useParallelMergePool;
+    } else {
+      if (useParallelMergePool == null || useParallelMergePool) {
+        LOG.debug(
+            "Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s",
+            parallelism
+        );
+      }
+      this.useParallelMergePool = false;
+    }
+
+    if (awaitShutdownMillis == null) {
+      if (oldConfig == null || oldConfig.getMergePoolAwaitShutdownMillis() == null) {
+        this.awaitShutdownMillis = DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS;
+      } else {
+        warnDeprecated(
+            "druid.processing.merge.pool.awaitShutdownMillis",
+            "druid.processing.merge.awaitShutdownMillis"
+        );
+        this.awaitShutdownMillis = oldConfig.getMergePoolAwaitShutdownMillis();
+      }
+    } else {
+      this.awaitShutdownMillis = awaitShutdownMillis;
+    }
+
+    if (defaultMaxQueryParallelism == null) {
+      if (oldConfig == null || oldConfig.getMergePoolDefaultMaxQueryParallelism() == null) {
+        this.defaultMaxQueryParallelism = (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1);
+      } else {
+        warnDeprecated(
+            "druid.processing.merge.pool.defaultMaxQueryParallelism",
+            "druid.processing.merge.defaultMaxQueryParallelism"
+        );
+        this.defaultMaxQueryParallelism = oldConfig.getMergePoolDefaultMaxQueryParallelism();
+      }
+    } else {
+      this.defaultMaxQueryParallelism = defaultMaxQueryParallelism;
+    }
+
+    if (targetRunTimeMillis == null) {
+      if (oldConfig == null || oldConfig.getMergePoolTargetTaskRunTimeMillis() == null) {
+        this.targetRunTimeMillis = ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS;
+      } else {
+        warnDeprecated(
+            "druid.processing.merge.task.targetRunTimeMillis",
+            "druid.processing.merge.targetRunTimeMillis"
+        );
+        this.targetRunTimeMillis = oldConfig.getMergePoolTargetTaskRunTimeMillis();
+      }
+    } else {
+      this.targetRunTimeMillis = targetRunTimeMillis;
+    }
+
+    if (initialYieldNumRows == null) {
+      if (oldConfig == null || oldConfig.getMergePoolTaskInitialYieldRows() == null) {
+        this.initialYieldNumRows = ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS;
+      } else {
+        warnDeprecated(
+            "druid.processing.merge.task.initialYieldNumRows",
+            "druid.processing.merge.initialYieldNumRows"
+        );
+        this.initialYieldNumRows = oldConfig.getMergePoolTaskInitialYieldRows();
+      }
+    } else {
+      this.initialYieldNumRows = initialYieldNumRows;
+    }
+
+    if (smallBatchNumRows == null) {
+      if (oldConfig == null || oldConfig.getMergePoolSmallBatchRows() == null) {
+        this.smallBatchNumRows = ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS;
+      } else {
+        warnDeprecated(
+            "druid.processing.merge.task.smallBatchNumRows",
+            "druid.processing.merge.smallBatchNumRows"
+        );
+        this.smallBatchNumRows = oldConfig.getMergePoolSmallBatchRows();
+      }
+    } else {
+      this.smallBatchNumRows = smallBatchNumRows;
+    }
+  }
+
+  @VisibleForTesting
+  public BrokerParallelMergeConfig()
+  {
+    this(null, null, null, null, null, null, null, null);
+  }
+
+  public boolean useParallelMergePool()
+  {
+    return useParallelMergePool;
+  }
+
+  public int getParallelism()
+  {
+    return parallelism;
+  }
+
+  public long getAwaitShutdownMillis()
+  {
+    return awaitShutdownMillis;
+  }
+
+  public int getDefaultMaxQueryParallelism()
+  {
+    return defaultMaxQueryParallelism;
+  }
+
+  public int getTargetRunTimeMillis()
+  {
+    return targetRunTimeMillis;
+  }
+
+  public int getInitialYieldNumRows()
+  {
+    return initialYieldNumRows;
+  }
+
+  public int getSmallBatchNumRows()
+  {
+    return smallBatchNumRows;
+  }
+
+  private static void warnDeprecated(String oldPath, String newPath)
+  {
+    LOG.warn(
+        "Using deprecated config [%s] which has been replace by [%s]. This path is deprecated and will be "
+        + "removed in a future release, please transition to using [%s]",
+        oldPath,
+        newPath,
+        newPath
+    );
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java b/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java
new file mode 100644
index 0000000000..25b11ab63d
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/query/LegacyBrokerParallelMergeConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.skife.config.Config;
+
+import javax.annotation.Nullable;
+
+/**
+ * Backwards compatibility for Druid 27 and older runtime.properties configs, replaced by
+ * {@link BrokerParallelMergeConfig} in Druid 28. This config should be removed in Druid 29.
+ */
+@Deprecated
+public abstract class LegacyBrokerParallelMergeConfig
+{
+  @Nullable
+  @Config(value = "druid.processing.merge.pool.parallelism")
+  public Integer getMergePoolParallelism()
+  {
+    return null;
+  }
+
+  @Nullable
+  @Config(value = "druid.processing.merge.pool.awaitShutdownMillis")
+  public Long getMergePoolAwaitShutdownMillis()
+  {
+    return null;
+  }
+
+  @Nullable
+  @Config(value = "druid.processing.merge.pool.defaultMaxQueryParallelism")
+  public Integer getMergePoolDefaultMaxQueryParallelism()
+  {
+    return null;
+  }
+
+  @Nullable
+  @Config(value = "druid.processing.merge.task.targetRunTimeMillis")
+  public Integer getMergePoolTargetTaskRunTimeMillis()
+  {
+    return null;
+  }
+
+  @Nullable
+  @Config(value = "druid.processing.merge.task.initialYieldNumRows")
+  public Integer getMergePoolTaskInitialYieldRows()
+  {
+    return null;
+  }
+
+  @Nullable
+  @Config(value = "druid.processing.merge.task.smallBatchNumRows")
+  public Integer getMergePoolSmallBatchRows()
+  {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java
deleted file mode 100644
index 1f6bc47dda..0000000000
--- a/server/src/main/java/org/apache/druid/segment/realtime/DbSegmentPublisherConfig.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.realtime;
-
-import org.skife.config.Config;
-
-public abstract class DbSegmentPublisherConfig
-{
-  @Config("druid.metadata.storage.tables.segments")
-  public abstract String getSegmentTable();
-}
diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index 46c0fc90d8..d486ae4fb8 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
 import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
+import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.java.util.metrics.JvmCpuMonitor;
 import org.apache.druid.java.util.metrics.JvmMonitor;
 import org.apache.druid.java.util.metrics.JvmThreadsMonitor;
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index f77baa4812..89cb2d76f8 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -38,7 +38,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
@@ -47,7 +47,6 @@ import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.planning.DataSourceAnalysis;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -318,16 +317,23 @@ public class CachingClusteredClientFunctionalityTest
             return 0L;
           }
         },
-        new DruidProcessingConfig()
+        new BrokerParallelMergeConfig()
         {
           @Override
-          public String getFormatString()
+          public boolean useParallelMergePool()
           {
-            return null;
+            return true;
+          }
+
+          @Override
+          public int getParallelism()
+          {
+            // fixed so same behavior across all test environments
+            return 4;
           }
 
           @Override
-          public int getMergePoolParallelism()
+          public int getDefaultMaxQueryParallelism()
           {
             // fixed so same behavior across all test environments
             return 4;
@@ -335,7 +341,6 @@ public class CachingClusteredClientFunctionalityTest
         },
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
         new NoopServiceEmitter()
     );
   }
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
index 95fce2060e..6270f36854 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java
@@ -34,8 +34,8 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.TestSequence;
 import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.DataSource;
-import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
@@ -49,7 +49,6 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.coordination.ServerManagerTest;
 import org.apache.druid.server.coordination.ServerType;
@@ -136,10 +135,9 @@ public class CachingClusteredClientPerfTest
         Mockito.mock(CachePopulator.class),
         new CacheConfig(),
         Mockito.mock(DruidHttpClientConfig.class),
-        Mockito.mock(DruidProcessingConfig.class),
+        Mockito.mock(BrokerParallelMergeConfig.class),
         ForkJoinPool.commonPool(),
         queryScheduler,
-        JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
         new NoopServiceEmitter()
     );
 
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index e083e074ac..0ebd441360 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -67,8 +67,8 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.guava.nary.TrinaryFn;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.BySegmentResultValueClass;
-import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.FinalizeResultsQueryRunner;
 import org.apache.druid.query.FluentQueryRunner;
@@ -121,7 +121,6 @@ import org.apache.druid.query.topn.TopNQueryConfig;
 import org.apache.druid.query.topn.TopNQueryQueryToolChest;
 import org.apache.druid.query.topn.TopNResultValue;
 import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.ServerTestHelper;
 import org.apache.druid.server.coordination.ServerType;
@@ -2829,19 +2828,26 @@ public class CachingClusteredClientTest
             return 0L;
           }
         },
-        new DruidProcessingConfig()
+        new BrokerParallelMergeConfig()
         {
           @Override
-          public String getFormatString()
+          public boolean useParallelMergePool()
           {
-            return null;
+            return true;
+          }
+
+          @Override
+          public int getParallelism()
+          {
+            // fixed so same behavior across all test environments
+            return 1;
           }
 
           @Override
-          public int getMergePoolParallelism()
+          public int getDefaultMaxQueryParallelism()
           {
             // fixed so same behavior across all test environments
-            return 4;
+            return 1;
           }
         },
         ForkJoinPool.commonPool(),
@@ -2851,7 +2857,6 @@ public class CachingClusteredClientTest
             NoQueryLaningStrategy.INSTANCE,
             new ServerConfig()
         ),
-        JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
         new NoopServiceEmitter()
     );
   }
diff --git a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
index 3ce5db12a3..7fca81aaea 100644
--- a/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
+++ b/server/src/test/java/org/apache/druid/guice/BrokerProcessingModuleTest.java
@@ -28,7 +28,10 @@ import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulator;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.initialization.Initialization;
+import org.apache.druid.java.util.common.config.Config;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.LegacyBrokerParallelMergeConfig;
 import org.apache.druid.utils.JvmUtils;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -37,14 +40,14 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.skife.config.ConfigurationObjectFactory;
+
+import java.util.Properties;
 
 
 @RunWith(MockitoJUnitRunner.class)
 public class BrokerProcessingModuleTest
 {
-  private static final boolean INJECT_SERVER_TYPE_CONFIG = true;
-  @Mock
-  private DruidProcessingConfig druidProcessingConfig;
   private Injector injector;
   private BrokerProcessingModule target;
   @Mock
@@ -56,12 +59,13 @@ public class BrokerProcessingModuleTest
   public void setUp()
   {
     target = new BrokerProcessingModule();
-    injector = makeInjector(INJECT_SERVER_TYPE_CONFIG);
+    injector = makeInjector(new Properties());
   }
 
   @Test
   public void testIntermediateResultsPool()
   {
+    DruidProcessingConfig druidProcessingConfig = injector.getInstance(DruidProcessingConfig.class);
     target.getIntermediateResultsPool(druidProcessingConfig);
   }
 
@@ -69,23 +73,30 @@ public class BrokerProcessingModuleTest
   @Test
   public void testMergeBufferPool()
   {
+    DruidProcessingConfig druidProcessingConfig = injector.getInstance(DruidProcessingConfig.class);
     target.getMergeBufferPool(druidProcessingConfig);
   }
 
   @Test
   public void testMergeProcessingPool()
   {
-    DruidProcessingConfig config = new DruidProcessingConfig()
-    {
-      @Override
-      public String getFormatString()
-      {
-        return "processing-test-%s";
-      }
-    };
-    DruidProcessingModule module = new DruidProcessingModule();
+    BrokerParallelMergeConfig config = injector.getInstance(BrokerParallelMergeConfig.class);
+    BrokerProcessingModule module = new BrokerProcessingModule();
     module.getMergeProcessingPoolProvider(config);
-    config.getNumInitalBuffersForIntermediatePool();
+  }
+
+  @Test
+  public void testMergeProcessingPoolLegacyConfigs()
+  {
+    Properties props = new Properties();
+    props.put("druid.processing.merge.pool.parallelism", "10");
+    props.put("druid.processing.merge.pool.defaultMaxQueryParallelism", "10");
+    props.put("druid.processing.merge.task.targetRunTimeMillis", "1000");
+    Injector gadget = makeInjector(props);
+    BrokerParallelMergeConfig config = gadget.getInstance(BrokerParallelMergeConfig.class);
+    Assert.assertEquals(10, config.getParallelism());
+    Assert.assertEquals(10, config.getDefaultMaxQueryParallelism());
+    Assert.assertEquals(1000, config.getTargetRunTimeMillis());
   }
 
   @Test
@@ -93,7 +104,6 @@ public class BrokerProcessingModuleTest
   {
     CachePopulator cachePopulator = injector.getInstance(CachePopulator.class);
     Assert.assertNotNull(cachePopulator);
-
   }
 
   @Test(expected = ProvisionException.class)
@@ -107,43 +117,40 @@ public class BrokerProcessingModuleTest
     catch (UnsupportedOperationException e) {
       Assume.assumeNoException(e);
     }
+    Properties props = new Properties();
+    props.setProperty("druid.processing.buffer.sizeBytes", "3GiB");
+    Injector injector1 = makeInjector(props);
 
+    DruidProcessingConfig processingBufferConfig = injector1.getInstance(DruidProcessingConfig.class);
     BrokerProcessingModule module = new BrokerProcessingModule();
-    module.getMergeBufferPool(new DruidProcessingConfig()
-    {
-      @Override
-      public String getFormatString()
-      {
-        return "test";
-      }
-
-      @Override
-      public int intermediateComputeSizeBytes()
-      {
-        return Integer.MAX_VALUE;
-      }
-    });
+    module.getMergeBufferPool(processingBufferConfig);
   }
 
-  private Injector makeInjector(boolean withServerTypeConfig)
+  private Injector makeInjector(Properties props)
   {
-    return Initialization.makeInjectorWithModules(
-        GuiceInjectors.makeStartupInjector(), (ImmutableList.of(Modules.override(
-            (binder) -> {
-              binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
-              binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
-              binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
-              binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig);
-            },
-            target
-        ).with(
-            (binder) -> {
+
+    Injector injector = Initialization.makeInjectorWithModules(
+        GuiceInjectors.makeStartupInjector(),
+        ImmutableList.of(
+            Modules.override(
+                (binder) -> {
+                  binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
+                  binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
+                  binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
+                  binder.bind(Properties.class).toInstance(props);
+                  ConfigurationObjectFactory factory = Config.createFactory(props);
+                  LegacyBrokerParallelMergeConfig legacyConfig = factory.build(LegacyBrokerParallelMergeConfig.class);
+                  binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
+                  binder.bind(LegacyBrokerParallelMergeConfig.class).toInstance(legacyConfig);
+                },
+                target
+            ).with((binder) -> {
               binder.bind(CachePopulatorStats.class).toInstance(cachePopulatorStats);
               binder.bind(CacheConfig.class).toInstance(cacheConfig);
-            }
+            })
         )
-        )));
+    );
+    return injector;
   }
-
 }
 
diff --git a/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java b/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java
index b51f2c1485..8fad8924bf 100644
--- a/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java
+++ b/server/src/test/java/org/apache/druid/guice/QueryableModuleTest.java
@@ -77,7 +77,7 @@ public class QueryableModuleTest
             new JacksonModule(),
             new ConfigModule(),
             new QueryRunnerFactoryModule(),
-            new DruidProcessingConfigModule(),
+            new LegacyBrokerParallelMergeConfigModule(),
             new BrokerProcessingModule(),
             new LifecycleModule(),
             binder -> binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class),
diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
index d07bfdef34..308427d40f 100644
--- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
+++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java
@@ -49,7 +49,6 @@ import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
-import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
@@ -108,7 +107,6 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
   {
     conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(
         CLOSER,
-        USE_PARALLEL_MERGE_POOL_CONFIGURED,
         () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD
     );
 
@@ -142,13 +140,9 @@ public abstract class QueryRunnerBasedOnClusteredClientTestBase
         new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0),
         new CacheConfig(),
         new DruidHttpClientConfig(),
-        QueryStackTests.getProcessingConfig(
-            USE_PARALLEL_MERGE_POOL_CONFIGURED,
-            DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
-        ),
+        QueryStackTests.getParallelMergeConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
         ForkJoinPool.commonPool(),
         QueryStackTests.DEFAULT_NOOP_SCHEDULER,
-        JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
         new NoopServiceEmitter()
     );
     servers = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index 1c23edf3b9..69305adc12 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.DataSource;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -100,6 +101,9 @@ public class QueryStackTests
       NoQueryLaningStrategy.INSTANCE,
       new ServerConfig()
   );
+
+  public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
+
   private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
   private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
 
@@ -187,10 +191,19 @@ public class QueryStackTests
     );
   }
 
-  public static DruidProcessingConfig getProcessingConfig(
-      boolean useParallelMergePoolConfigured,
-      final int mergeBuffers
+  public static BrokerParallelMergeConfig getParallelMergeConfig(
+      boolean useParallelMergePoolConfigured
   )
+  {
+    return new BrokerParallelMergeConfig() {
+      @Override
+      public boolean useParallelMergePool()
+      {
+        return useParallelMergePoolConfigured;
+      }
+    };
+  }
+  public static DruidProcessingConfig getProcessingConfig(final int mergeBuffers)
   {
     return new DruidProcessingConfig()
     {
@@ -221,12 +234,6 @@ public class QueryStackTests
         }
         return mergeBuffers;
       }
-
-      @Override
-      public boolean useParallelMergePoolConfigured()
-      {
-        return useParallelMergePoolConfigured;
-      }
     };
   }
 
@@ -235,28 +242,18 @@ public class QueryStackTests
    */
   public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(final Closer closer)
   {
-    return createQueryRunnerFactoryConglomerate(closer, true, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD);
-  }
-
-  public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
-      final Closer closer,
-      final Supplier<Integer> minTopNThresholdSupplier
-  )
-  {
-    return createQueryRunnerFactoryConglomerate(closer, true, minTopNThresholdSupplier);
+    return createQueryRunnerFactoryConglomerate(closer, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD);
   }
 
   public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
       final Closer closer,
-      final boolean useParallelMergePoolConfigured,
       final Supplier<Integer> minTopNThresholdSupplier
   )
   {
     return createQueryRunnerFactoryConglomerate(
         closer,
         getProcessingConfig(
-            useParallelMergePoolConfigured,
-            DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
+            DEFAULT_NUM_MERGE_BUFFERS
         ),
         minTopNThresholdSupplier
     );
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 6bbcdf7f3a..dd9c3c3c0b 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -45,6 +45,7 @@ import org.apache.druid.guice.Jerseys;
 import org.apache.druid.guice.JoinableFactoryModule;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LegacyBrokerParallelMergeConfigModule;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.QueryRunnerFactoryModule;
@@ -108,6 +109,7 @@ public class CliBroker extends ServerRunnable
   protected List<? extends Module> getModules()
   {
     return ImmutableList.of(
+        new LegacyBrokerParallelMergeConfigModule(),
         new BrokerProcessingModule(),
         new QueryableModule(),
         new QueryRunnerFactoryModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/DumpSegment.java b/services/src/main/java/org/apache/druid/cli/DumpSegment.java
index 566f89d5fd..afbd58ab95 100644
--- a/services/src/main/java/org/apache/druid/cli/DumpSegment.java
+++ b/services/src/main/java/org/apache/druid/cli/DumpSegment.java
@@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Module;
@@ -55,7 +54,6 @@ import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.DirectQueryProcessingPool;
-import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
@@ -81,7 +79,6 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.SimpleAscendingOffset;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.BaseColumn;
-import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnIndexSupplier;
 import org.apache.druid.segment.column.ColumnType;
@@ -733,39 +730,10 @@ public class DumpSegment extends GuiceRunnable
         new DruidProcessingModule(),
         new QueryableModule(),
         new QueryRunnerFactoryModule(),
-        new Module()
-        {
-          @Override
-          public void configure(Binder binder)
-          {
-            binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
-            binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
-            binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
-            binder.bind(DruidProcessingConfig.class).toInstance(
-                new DruidProcessingConfig()
-                {
-                  @Override
-                  public String getFormatString()
-                  {
-                    return "processing-%s";
-                  }
-
-                  @Override
-                  public int intermediateComputeSizeBytes()
-                  {
-                    return 100 * 1024 * 1024;
-                  }
-
-                  @Override
-                  public int getNumThreads()
-                  {
-                    return 1;
-                  }
-
-                }
-            );
-            binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
-          }
+        binder -> {
+          binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
+          binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
+          binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
         }
     );
   }
diff --git a/services/src/main/java/org/apache/druid/cli/ValidateSegments.java b/services/src/main/java/org/apache/druid/cli/ValidateSegments.java
index 925d7c81a8..f9e2ce627b 100644
--- a/services/src/main/java/org/apache/druid/cli/ValidateSegments.java
+++ b/services/src/main/java/org/apache/druid/cli/ValidateSegments.java
@@ -23,7 +23,6 @@ import com.github.rvesse.airline.annotations.Arguments;
 import com.github.rvesse.airline.annotations.Command;
 import com.github.rvesse.airline.annotations.restrictions.Required;
 import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
@@ -32,9 +31,7 @@ import org.apache.druid.guice.QueryRunnerFactoryModule;
 import org.apache.druid.guice.QueryableModule;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.column.ColumnConfig;
 
 import java.io.File;
 import java.util.List;
@@ -85,39 +82,10 @@ public class ValidateSegments extends GuiceRunnable
         new DruidProcessingModule(),
         new QueryableModule(),
         new QueryRunnerFactoryModule(),
-        new Module()
-        {
-          @Override
-          public void configure(Binder binder)
-          {
-            binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
-            binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
-            binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
-            binder.bind(DruidProcessingConfig.class).toInstance(
-                new DruidProcessingConfig()
-                {
-                  @Override
-                  public String getFormatString()
-                  {
-                    return "processing-%s";
-                  }
-
-                  @Override
-                  public int intermediateComputeSizeBytes()
-                  {
-                    return 100 * 1024 * 1024;
-                  }
-
-                  @Override
-                  public int getNumThreads()
-                  {
-                    return 1;
-                  }
-
-                }
-            );
-            binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
-          }
+        binder -> {
+          binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/tool");
+          binder.bindConstant().annotatedWith(Names.named("servicePort")).to(9999);
+          binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
         }
     );
   }
diff --git a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java
index 10027ae73b..fb440311a6 100644
--- a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java
+++ b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java
@@ -25,12 +25,15 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.inject.Injector;
 import com.google.inject.Key;
+import com.google.inject.name.Names;
 import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.guice.NestedDataModule;
+import org.apache.druid.guice.StartupInjectorBuilder;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.initialization.ServerInjectorBuilder;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -48,6 +51,7 @@ import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnIndexSupplier;
 import org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex;
@@ -215,6 +219,22 @@ public class DumpSegmentTest extends InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void testGetModules()
+  {
+    DumpSegment dumpSegment = new DumpSegment();
+    Injector injector = ServerInjectorBuilder.makeServerInjector(
+        new StartupInjectorBuilder().forServer().build(),
+        Collections.emptySet(),
+        dumpSegment.getModules()
+    );
+    Assert.assertNotNull(injector.getInstance(ColumnConfig.class));
+    Assert.assertEquals("druid/tool", injector.getInstance(Key.get(String.class, Names.named("serviceName"))));
+    Assert.assertEquals(9999, (int) injector.getInstance(Key.get(Integer.class, Names.named("servicePort"))));
+    Assert.assertEquals(-1, (int) injector.getInstance(Key.get(Integer.class, Names.named("tlsServicePort"))));
+  }
+
+
   public static List<Segment> createSegments(
       AggregationTestHelper helper,
       TemporaryFolder tempFolder,
diff --git a/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java b/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java
new file mode 100644
index 0000000000..03ce574196
--- /dev/null
+++ b/services/src/test/java/org/apache/druid/cli/ValidateSegmentsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.cli;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.name.Names;
+import org.apache.druid.data.input.ResourceInputSource;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.initialization.ServerInjectorBuilder;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.NestedDataTestUtils;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class ValidateSegmentsTest extends InitializedNullHandlingTest
+{
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void testValidateSegments() throws IOException
+  {
+
+    JsonInputFormat inputFormat = new JsonInputFormat(
+        JSONPathSpec.DEFAULT,
+        null,
+        null,
+        null,
+        null
+    );
+    IndexBuilder bob = IndexBuilder.create()
+                                   .tmpDir(temporaryFolder.newFolder())
+                                   .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                                   .schema(
+                                       new IncrementalIndexSchema.Builder()
+                                           .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec())
+                                           .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec())
+                                           .withMetrics(
+                                               new CountAggregatorFactory("cnt")
+                                           )
+                                           .withRollup(false)
+                                           .build()
+                                   )
+                                   .inputSource(
+                                       ResourceInputSource.of(
+                                           NestedDataTestUtils.class.getClassLoader(),
+                                           NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE
+                                       )
+                                   )
+                                   .inputFormat(inputFormat)
+                                   .inputTmpDir(temporaryFolder.newFolder());
+
+    final File segment1 = bob.buildMMappedIndexFile();
+    final File segment2 = bob.buildMMappedIndexFile();
+    final Injector injector = Mockito.mock(Injector.class);
+    Mockito.when(injector.getInstance(IndexIO.class)).thenReturn(bob.getIndexIO());
+    ValidateSegments validator = new ValidateSegments() {
+      @Override
+      public Injector makeInjector()
+      {
+        return injector;
+      }
+    };
+    validator.directories = Arrays.asList(segment1.getAbsolutePath(), segment2.getAbsolutePath());
+    // if this doesn't pass, it throws a runtime exception, which would fail the test
+    validator.run();
+  }
+
+  @Test
+  public void testGetModules()
+  {
+    ValidateSegments validator = new ValidateSegments();
+    Injector injector = ServerInjectorBuilder.makeServerInjector(
+        new StartupInjectorBuilder().forServer().build(),
+        Collections.emptySet(),
+        validator.getModules()
+    );
+    Assert.assertNotNull(injector.getInstance(ColumnConfig.class));
+    Assert.assertEquals("druid/tool", injector.getInstance(Key.get(String.class, Names.named("serviceName"))));
+    Assert.assertEquals(9999, (int) injector.getInstance(Key.get(Integer.class, Names.named("servicePort"))));
+    Assert.assertEquals(-1, (int) injector.getInstance(Key.get(Integer.class, Names.named("tlsServicePort"))));
+  }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
index 0881a969fc..3467d71bb6 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java
@@ -219,7 +219,7 @@ public class SqlTestFramework
       } else {
         return QueryStackTests.createQueryRunnerFactoryConglomerate(
             resourceCloser,
-            QueryStackTests.getProcessingConfig(true, builder.mergeBufferCount)
+            QueryStackTests.getProcessingConfig(builder.mergeBufferCount)
         );
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org