You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/14 20:01:07 UTC

[kafka] branch 3.5 updated: Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.5 by this push:
     new b14ebf9e9cf Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527)
b14ebf9e9cf is described below

commit b14ebf9e9cf310ff03dfe9ca3d2cdd7fdafc8944
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Apr 14 12:08:49 2023 -0700

    Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527)
    
    This reverts commit d9b139220ee253da673af44d58dc87bd184188f1.
    
    KIP-878 implementation did not make any progress, so we need to revert
    the public API changes which are not functional right now.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |  25 -----
 .../assignment/AssignorConfiguration.java          |  12 ---
 .../apache/kafka/streams/StreamsConfigTest.java    |  16 ----
 .../internals/assignment/AssignmentTestUtils.java  |  88 ------------------
 .../assignment/AssignorConfigurationTest.java      |   2 +-
 .../ClientTagAwareStandbyTaskAssignorTest.java     |   2 -
 .../assignment/FallbackPriorTaskAssignorTest.java  |   2 +-
 .../HighAvailabilityTaskAssignorTest.java          | 102 +++++++++++++--------
 .../assignment/StandbyTaskAssignorFactoryTest.java |   4 -
 .../assignment/StickyTaskAssignorTest.java         |   4 +-
 .../assignment/TaskAssignorConvergenceTest.java    |  25 +++--
 11 files changed, 84 insertions(+), 198 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index bb0c519d380..6eb34dfd9dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -631,18 +631,6 @@ public class StreamsConfig extends AbstractConfig {
     public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
     private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
 
-    /** {@code partition.autoscaling.enabled} */
-    @SuppressWarnings("WeakerAccess")
-    public static final String PARTITION_AUTOSCALING_ENABLED_CONFIG = "partition.autoscaling.enabled";
-    private static final String PARTITION_AUTOSCALING_ENABLED_DOC = "Enable autoscaling the partitions of internal topics which are managed by Streams."
-        + " If an internal topic's partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input "
-        + "topic(s) will result in the internal topic(s) automatically being expanded to match.";
-
-    /** {@code partition.autoscaling.timeout.ms} */
-    @SuppressWarnings("WeakerAccess")
-    public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms";
-    private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions.";
-
     /** {@code poll.ms} */
     @SuppressWarnings("WeakerAccess")
     public static final String POLL_MS_CONFIG = "poll.ms";
@@ -1014,17 +1002,6 @@ public class StreamsConfig extends AbstractConfig {
                     true,
                     Importance.LOW,
                     CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
-            .define(PARTITION_AUTOSCALING_ENABLED_CONFIG,
-                    Type.BOOLEAN,
-                    false,
-                    Importance.LOW,
-                    PARTITION_AUTOSCALING_ENABLED_DOC)
-            .define(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG,
-                    Type.LONG,
-                    15 * 60 * 1000L,
-                    atLeast(0),
-                    Importance.LOW,
-                    PARTITION_AUTOSCALING_TIMEOUT_MS_DOC)
             .define(POLL_MS_CONFIG,
                     Type.LONG,
                     100L,
@@ -1563,8 +1540,6 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
         consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
         consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
-        consumerProps.put(PARTITION_AUTOSCALING_ENABLED_CONFIG, getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
-        consumerProps.put(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG));
         consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
         consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
         consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 3825a33eb34..d40489eab29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -261,16 +261,10 @@ public final class AssignorConfiguration {
         void onAssignmentComplete(final boolean stable);
     }
 
-    /**
-     * NOTE: any StreamsConfig you add here MUST be passed in to the consumer via
-     * {@link StreamsConfig#getMainConsumerConfigs}
-     */
     public static class AssignmentConfigs {
         public final long acceptableRecoveryLag;
         public final int maxWarmupReplicas;
         public final int numStandbyReplicas;
-        public final boolean partitionAutoscalingEnabled;
-        public final long partitionAutoscalingTimeoutMs;
         public final long probingRebalanceIntervalMs;
         public final List<String> rackAwareAssignmentTags;
 
@@ -278,8 +272,6 @@ public final class AssignorConfiguration {
             acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
             maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
             numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
-            partitionAutoscalingEnabled = configs.getBoolean(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG);
-            partitionAutoscalingTimeoutMs = configs.getLong(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG);
             probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
             rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
         }
@@ -287,15 +279,11 @@ public final class AssignorConfiguration {
         AssignmentConfigs(final Long acceptableRecoveryLag,
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
-                          final boolean partitionAutoscalingEnabled,
-                          final long partitionAutoscalingTimeoutMs,
                           final Long probingRebalanceIntervalMs,
                           final List<String> rackAwareAssignmentTags) {
             this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
             this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
             this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
-            this.partitionAutoscalingEnabled = validated(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG, partitionAutoscalingEnabled);
-            this.partitionAutoscalingTimeoutMs = validated(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, partitionAutoscalingTimeoutMs);
             this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
             this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 29c1be977c5..05582b74aee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -62,8 +62,6 @@ import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
 import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
 import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
-import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG;
-import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
@@ -1371,20 +1369,6 @@ public class StreamsConfigTest {
         assertEquals(0, configs.size());
     }
 
-    @Test
-    public void shouldEnablePartitionAutoscaling() {
-        props.put("partition.autoscaling.enabled", true);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertTrue(config.getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
-    }
-
-    @Test
-    public void shouldSetPartitionAutoscalingTimeout() {
-        props.put("partition.autoscaling.timeout.ms", 0L);
-        final StreamsConfig config = new StreamsConfig(props);
-        assertThat(config.getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG), is(0L));
-    }
-
     @Test
     public void shouldReturnDefaultClientSupplier() {
         final KafkaClientSupplier supplier = streamsConfig.getKafkaClientSupplier();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 181bedab6f5..8993372dd5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -113,93 +112,6 @@ public final class AssignmentTestUtils {
 
     private AssignmentTestUtils() {}
 
-    public static final long ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT = 100;
-
-    public static AssignmentConfigs getDefaultConfigsWithZeroStandbys() {
-        return new AssignmentConfigs(
-            ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
-            2,
-            0,
-            false,
-            90_000L,
-            60_000L,
-            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-        );
-    }
-
-    public static AssignmentConfigs getDefaultConfigsWithOneStandbys() {
-        return new AssignmentConfigs(
-            ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
-            2,
-            1,
-            false,
-            90_000L,
-            60_000L,
-            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-        );
-    }
-
-    public static AssignmentConfigs getConfigsWithZeroStandbysAndWarmups(final int maxWarmups) {
-        return new AssignmentConfigs(
-            ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
-            maxWarmups,
-            0,
-            false,
-            90_000L,
-            60_000L,
-            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-        );
-    }
-
-    public static AssignmentConfigs getConfigsWithOneStandbysAndWarmups(final int maxWarmups) {
-        return new AssignmentConfigs(
-            ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
-            maxWarmups,
-            1,
-            false,
-            90_000L,
-            60_000L,
-            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-        );
-    }
-
-    public static AssignmentConfigs getConfigsWithOneStandbysAndZeroLagAndWarmups(final int maxWarmups) {
-        return new AssignmentConfigs(
-            0L,
-            maxWarmups,
-            1,
-            false,
-            90_000L,
-            60_000L,
-            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-        );
-    }
-
-    public static AssignmentConfigs getConfigsWithZeroStandbysAndZeroLagAndWarmups(final int maxWarmups) {
-        return new AssignmentConfigs(
-            0L,
-            maxWarmups,
-            0,
-            false,
-            90_000L,
-            60_000L,
-            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-        );
-    }
-
-    public static AssignmentConfigs getConfigsWithOneStandbysAndLagAndWarmups(final long acceptableRecoveryLag,
-                                                                              final int maxWarmups) {
-        return new AssignmentConfigs(
-            acceptableRecoveryLag,
-            maxWarmups,
-            1,
-            false,
-            90_000L,
-            60_000L,
-            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-        );
-    }
-
     static Map<UUID, ClientState> getClientStatesMap(final ClientState... states) {
         final Map<UUID, ClientState> clientStates = new HashMap<>();
         int nthState = 1;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
index ede16ce6372..9ff53b54244 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
@@ -30,7 +30,7 @@ public class AssignorConfigurationTest {
     public void configsShouldRejectZeroWarmups() {
         final ConfigException exception = assertThrows(
             ConfigException.class,
-            () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, false, 1L, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
index 2b87dc09064..631430c6a82 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
@@ -656,8 +656,6 @@ public class ClientTagAwareStandbyTaskAssignorTest {
         return new AssignmentConfigs(0L,
                                      1,
                                      numStandbyReplicas,
-                                     false,
-                                     1L,
                                      60000L,
                                      asList(rackAwareAssignmentTags));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
index a43eb5fb857..0473d9bee45 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
@@ -54,7 +54,7 @@ public class FallbackPriorTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 1L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
         assertThat(probingRebalanceNeeded, is(true));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index 708f6e3afb0..90e0fed51f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -33,6 +34,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
@@ -54,12 +56,6 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndLagAndWarmups;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndWarmups;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndWarmups;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndZeroLagAndWarmups;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithOneStandbys;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
@@ -71,7 +67,23 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.fail;
 
-public class HighAvailabilityTaskAssignorTest {    
+public class HighAvailabilityTaskAssignorTest {
+    private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs(
+        /*acceptableRecoveryLag*/ 100L,
+        /*maxWarmupReplicas*/ 2,
+        /*numStandbyReplicas*/ 0,
+        /*probingRebalanceIntervalMs*/ 60 * 1000L,
+        /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+    );
+
+    private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+        /*acceptableRecoveryLag*/ 100L,
+        /*maxWarmupReplicas*/ 2,
+        /*numStandbyReplicas*/ 1,
+        /*probingRebalanceIntervalMs*/ 60 * 1000L,
+        /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+    );
+
     @Test
     public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
         final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
@@ -89,7 +101,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithOneStandbysAndLagAndWarmups(11L, 2)
+            new AssignmentConfigs(11L, 2, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
@@ -118,7 +130,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithOneStandbysAndLagAndWarmups(Long.MAX_VALUE, 1)
+            new AssignmentConfigs(Long.MAX_VALUE, 1, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(clientState1, hasAssignedTasks(6));
@@ -139,7 +151,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
         assertThat(unstable, is(false));
         assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@@ -160,7 +172,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
         assertThat(unstable, is(false));
         assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@@ -180,7 +192,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(unstable, is(false));
@@ -202,7 +214,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(unstable, is(false));
@@ -231,7 +243,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(unstable, is(false));
@@ -253,7 +265,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(unstable, is(false));
@@ -278,7 +290,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1)
+            new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(unstable, is(true));
@@ -308,7 +320,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1)
+            new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(unstable, is(false));
@@ -329,7 +341,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(unstable, is(false));
@@ -349,7 +361,7 @@ public class HighAvailabilityTaskAssignorTest {
         final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
                                                                                          allTasks,
                                                                                          singleton(TASK_0_0),
-                                                                                         getDefaultConfigsWithZeroStandbys());
+                                                                                         configWithoutStandbys);
 
         assertThat(probingRebalanceNeeded, is(false));
         assertThat(client1, hasActiveTasks(2));
@@ -372,7 +384,7 @@ public class HighAvailabilityTaskAssignorTest {
         final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
                                                                                          allTasks,
                                                                                          statefulTasks,
-                                                                                         getDefaultConfigsWithOneStandbys());
+                                                                                         configWithStandbys);
 
         assertThat(clientStates.get(UUID_2).standbyTasks(), not(empty()));
         assertThat(probingRebalanceNeeded, is(false));
@@ -394,7 +406,7 @@ public class HighAvailabilityTaskAssignorTest {
         );
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
 
         assertThat(clientStates.get(UUID_1).activeTasks(), is(singleton(TASK_0_1)));
         assertThat(clientStates.get(UUID_2).activeTasks(), is(singleton(TASK_0_0)));
@@ -421,7 +433,7 @@ public class HighAvailabilityTaskAssignorTest {
         );
 
         final boolean probingRebalanceNeeded =
-                new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+                new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
 
         assertThat(clientStates.get(UUID_1).activeTasks(), is(emptySet()));
         assertThat(clientStates.get(UUID_2).activeTasks(), is(emptySet()));
@@ -448,7 +460,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
 
 
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0)));
@@ -468,7 +480,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
 
 
         assertThat(client1.activeTaskCount(), equalTo(1));
@@ -486,7 +498,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
 
 
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
@@ -509,7 +521,13 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            getConfigsWithZeroStandbysAndWarmups(1)
+            new AssignmentConfigs(
+                /*acceptableRecoveryLag*/ 100L,
+                /*maxWarmupReplicas*/ 1,
+                /*numStandbyReplicas*/ 0,
+                /*probingRebalanceIntervalMs*/ 60 * 1000L,
+                /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+            )
         );
 
 
@@ -532,7 +550,13 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            getConfigsWithOneStandbysAndWarmups(1)
+            new AssignmentConfigs(
+                /*acceptableRecoveryLag*/ 100L,
+                /*maxWarmupReplicas*/ 1,
+                /*numStandbyReplicas*/ 1,
+                /*probingRebalanceIntervalMs*/ 60 * 1000L,
+                /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+            )
         );
 
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));
@@ -550,7 +574,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
 
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
         assertHasNoStandbyTasks(client1);
@@ -566,7 +590,7 @@ public class HighAvailabilityTaskAssignorTest {
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1);
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
         assertHasNoStandbyTasks(client1);
         assertThat(probingRebalanceNeeded, is(false));
@@ -583,7 +607,7 @@ public class HighAvailabilityTaskAssignorTest {
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
 
         assertValidAssignment(
             1,
@@ -608,7 +632,7 @@ public class HighAvailabilityTaskAssignorTest {
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
         assertValidAssignment(
             1,
             2,
@@ -641,7 +665,7 @@ public class HighAvailabilityTaskAssignorTest {
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
 
         assertThat(client1.activeTasks(), not(empty()));
         assertThat(client2.activeTasks(), not(empty()));
@@ -658,7 +682,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
 
         assertThat(probingRebalanceNeeded, is(false));
         assertThat(client1.activeTasks(), equalTo(client1.prevActiveTasks()));
@@ -674,7 +698,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
         assertThat(probingRebalanceNeeded, is(false));
         assertHasNoStandbyTasks(client1, client2);
     }
@@ -688,7 +712,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
         assertThat(probingRebalanceNeeded, is(true));
         assertThat(client2.standbyTaskCount(), equalTo(1));
     }
@@ -710,7 +734,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertValidAssignment(
@@ -741,7 +765,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertValidAssignment(
@@ -772,7 +796,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertValidAssignment(
@@ -803,7 +827,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertValidAssignment(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
index 0c22cd290e7..fdd7fa1d473 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
@@ -28,8 +28,6 @@ public class StandbyTaskAssignorFactoryTest {
     private static final long ACCEPTABLE_RECOVERY_LAG = 0L;
     private static final int MAX_WARMUP_REPLICAS = 1;
     private static final int NUMBER_OF_STANDBY_REPLICAS = 1;
-    private static final boolean PARTITION_AUTOSCALING_ENABLED = false;
-    private static final long PARTITION_AUTOSCALING_TIMEOUT_MS = 90000L;
     private static final long PROBING_REBALANCE_INTERVAL_MS = 60000L;
 
     @Test
@@ -48,8 +46,6 @@ public class StandbyTaskAssignorFactoryTest {
         return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG,
                                                            MAX_WARMUP_REPLICAS,
                                                            NUMBER_OF_STANDBY_REPLICAS,
-                                                           PARTITION_AUTOSCALING_ENABLED,
-                                                           PARTITION_AUTOSCALING_TIMEOUT_MS,
                                                            PROBING_REBALANCE_INTERVAL_MS,
                                                            rackAwareAssignmentTags);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 82e3de4baf7..8c1347f22d9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -677,7 +677,7 @@ public class StickyTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
         assertThat(probingRebalanceNeeded, is(false));
 
@@ -696,7 +696,7 @@ public class StickyTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index 2b1564fa1a1..c1be5f33fa2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -29,13 +29,11 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.function.Supplier;
 
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.appendClientStates;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
 import static org.junit.Assert.fail;
 
@@ -231,7 +229,11 @@ public class TaskAssignorConvergenceTest {
 
     @Test
     public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
-        final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
+        final AssignmentConfigs configs = new AssignmentConfigs(100L,
+                                                                2,
+                                                                0,
+                                                                60_000L,
+                                                                EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
 
         final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
 
@@ -247,7 +249,11 @@ public class TaskAssignorConvergenceTest {
         final int maxWarmupReplicas = 2;
         final int numStandbyReplicas = 0;
 
-        final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
+        final AssignmentConfigs configs = new AssignmentConfigs(100L,
+                                                                maxWarmupReplicas,
+                                                                numStandbyReplicas,
+                                                                60_000L,
+                                                                EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
 
         final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1, () -> 5);
         testForConvergence(harness, configs, 1);
@@ -266,7 +272,11 @@ public class TaskAssignorConvergenceTest {
         final int maxWarmupReplicas = 2;
         final int numStandbyReplicas = 0;
 
-        final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
+        final AssignmentConfigs configs = new AssignmentConfigs(100L,
+                                                                maxWarmupReplicas,
+                                                                numStandbyReplicas,
+                                                                60_000L,
+                                                                EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
 
         final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7, () -> 5);
         testForConvergence(harness, configs, 1);
@@ -304,11 +314,9 @@ public class TaskAssignorConvergenceTest {
 
             final int numberOfEvents = prng.nextInt(10) + 1;
 
-            final AssignmentConfigs configs = new AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+            final AssignmentConfigs configs = new AssignmentConfigs(100L,
                                                                     maxWarmupReplicas,
                                                                     numStandbyReplicas,
-                                                                    false,
-                                                                    90_000L,
                                                                     60_000L,
                                                                     EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
 
@@ -419,4 +427,5 @@ public class TaskAssignorConvergenceTest {
         }
     }
 
+
 }