You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/09 23:02:44 UTC

[kafka] branch trunk updated: KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d9b139220ee KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)
d9b139220ee is described below

commit d9b139220ee253da673af44d58dc87bd184188f1
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Dec 9 15:02:36 2022 -0800

    KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)
    
    First PR for KIP-878: Internal Topic Autoscaling for Kafka Streams
    
    Introduces two new configs related to autoscaling in Streams: a feature flag and retry timeout. This PR just adds the configs and gets them passed through to the Streams assignor where they'll ultimately be needed/used
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Walker Carlson <wc...@confluent.io>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |  25 +++++
 .../assignment/AssignorConfiguration.java          |  12 +++
 .../apache/kafka/streams/StreamsConfigTest.java    |  16 ++++
 .../internals/assignment/AssignmentTestUtils.java  |  88 ++++++++++++++++++
 .../assignment/AssignorConfigurationTest.java      |   2 +-
 .../ClientTagAwareStandbyTaskAssignorTest.java     |   2 +
 .../assignment/FallbackPriorTaskAssignorTest.java  |   2 +-
 .../HighAvailabilityTaskAssignorTest.java          | 102 ++++++++-------------
 .../assignment/StandbyTaskAssignorFactoryTest.java |   4 +
 .../assignment/StickyTaskAssignorTest.java         |   4 +-
 .../assignment/TaskAssignorConvergenceTest.java    |  25 ++---
 11 files changed, 198 insertions(+), 84 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 9fabf3c4298..2d26b750005 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -614,6 +614,18 @@ public class StreamsConfig extends AbstractConfig {
     public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
     private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
 
+    /** {@code partition.autoscaling.enabled} */
+    @SuppressWarnings("WeakerAccess")
+    public static final String PARTITION_AUTOSCALING_ENABLED_CONFIG = "partition.autoscaling.enabled";
+    private static final String PARTITION_AUTOSCALING_ENABLED_DOC = "Enable autoscaling the partitions of internal topics which are managed by Streams."
+        + " If an internal topic's partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input "
+        + "topic(s) will result in the internal topic(s) automatically being expanded to match.";
+
+    /** {@code partition.autoscaling.timeout.ms} */
+    @SuppressWarnings("WeakerAccess")
+    public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms";
+    private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions.";
+
     /** {@code poll.ms} */
     @SuppressWarnings("WeakerAccess")
     public static final String POLL_MS_CONFIG = "poll.ms";
@@ -975,6 +987,17 @@ public class StreamsConfig extends AbstractConfig {
                     true,
                     Importance.LOW,
                     CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
+            .define(PARTITION_AUTOSCALING_ENABLED_CONFIG,
+                    Type.BOOLEAN,
+                    false,
+                    Importance.LOW,
+                    PARTITION_AUTOSCALING_ENABLED_DOC)
+            .define(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG,
+                    Type.LONG,
+                    15 * 60 * 1000L,
+                    atLeast(0),
+                    Importance.LOW,
+                    PARTITION_AUTOSCALING_TIMEOUT_MS_DOC)
             .define(POLL_MS_CONFIG,
                     Type.LONG,
                     100L,
@@ -1511,6 +1534,8 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
         consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
         consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PARTITION_AUTOSCALING_ENABLED_CONFIG, getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
+        consumerProps.put(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG));
         consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
         consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
         consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 4a3d46cfc3b..0ac72b71587 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -259,10 +259,16 @@ public final class AssignorConfiguration {
         void onAssignmentComplete(final boolean stable);
     }
 
+    /**
+     * NOTE: any StreamsConfig you add here MUST be passed in to the consumer via
+     * {@link StreamsConfig#getMainConsumerConfigs}
+     */
     public static class AssignmentConfigs {
         public final long acceptableRecoveryLag;
         public final int maxWarmupReplicas;
         public final int numStandbyReplicas;
+        public final boolean partitionAutoscalingEnabled;
+        public final long partitionAutoscalingTimeoutMs;
         public final long probingRebalanceIntervalMs;
         public final List<String> rackAwareAssignmentTags;
 
@@ -270,6 +276,8 @@ public final class AssignorConfiguration {
             acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
             maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
             numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+            partitionAutoscalingEnabled = configs.getBoolean(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG);
+            partitionAutoscalingTimeoutMs = configs.getLong(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG);
             probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
             rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
         }
@@ -277,11 +285,15 @@ public final class AssignorConfiguration {
         AssignmentConfigs(final Long acceptableRecoveryLag,
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
+                          final boolean partitionAutoscalingEnabled,
+                          final long partitionAutoscalingTimeoutMs,
                           final Long probingRebalanceIntervalMs,
                           final List<String> rackAwareAssignmentTags) {
             this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
             this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
             this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
+            this.partitionAutoscalingEnabled = validated(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG, partitionAutoscalingEnabled);
+            this.partitionAutoscalingTimeoutMs = validated(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, partitionAutoscalingTimeoutMs);
             this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
             this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f0c0ce74ccd..b271e076db2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -61,6 +61,8 @@ import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
 import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
 import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
+import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
@@ -1368,6 +1370,20 @@ public class StreamsConfigTest {
         assertEquals(0, configs.size());
     }
 
+    @Test
+    public void shouldEnablePartitionAutoscaling() {
+        props.put("partition.autoscaling.enabled", true);
+        final StreamsConfig config = new StreamsConfig(props);
+        assertTrue(config.getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
+    }
+
+    @Test
+    public void shouldSetPartitionAutoscalingTimeout() {
+        props.put("partition.autoscaling.timeout.ms", 0L);
+        final StreamsConfig config = new StreamsConfig(props);
+        assertThat(config.getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG), is(0L));
+    }
+
     static class MisconfiguredSerde implements Serde<Object> {
         @Override
         public void configure(final Map<String, ?>  configs, final boolean isKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 78c6477f386..71c92de2e7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
 
 import org.easymock.EasyMock;
 import org.hamcrest.BaseMatcher;
@@ -112,6 +113,93 @@ public final class AssignmentTestUtils {
 
     private AssignmentTestUtils() {}
 
+    public static final long ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT = 100;
+
+    public static AssignmentConfigs getDefaultConfigsWithZeroStandbys() {
+        return new AssignmentConfigs(
+            ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+            2,
+            0,
+            false,
+            90_000L,
+            60_000L,
+            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+        );
+    }
+
+    public static AssignmentConfigs getDefaultConfigsWithOneStandbys() {
+        return new AssignmentConfigs(
+            ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+            2,
+            1,
+            false,
+            90_000L,
+            60_000L,
+            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+        );
+    }
+
+    public static AssignmentConfigs getConfigsWithZeroStandbysAndWarmups(final int maxWarmups) {
+        return new AssignmentConfigs(
+            ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+            maxWarmups,
+            0,
+            false,
+            90_000L,
+            60_000L,
+            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+        );
+    }
+
+    public static AssignmentConfigs getConfigsWithOneStandbysAndWarmups(final int maxWarmups) {
+        return new AssignmentConfigs(
+            ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+            maxWarmups,
+            1,
+            false,
+            90_000L,
+            60_000L,
+            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+        );
+    }
+
+    public static AssignmentConfigs getConfigsWithOneStandbysAndZeroLagAndWarmups(final int maxWarmups) {
+        return new AssignmentConfigs(
+            0L,
+            maxWarmups,
+            1,
+            false,
+            90_000L,
+            60_000L,
+            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+        );
+    }
+
+    public static AssignmentConfigs getConfigsWithZeroStandbysAndZeroLagAndWarmups(final int maxWarmups) {
+        return new AssignmentConfigs(
+            0L,
+            maxWarmups,
+            0,
+            false,
+            90_000L,
+            60_000L,
+            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+        );
+    }
+
+    public static AssignmentConfigs getConfigsWithOneStandbysAndLagAndWarmups(final long acceptableRecoveryLag,
+                                                                              final int maxWarmups) {
+        return new AssignmentConfigs(
+            acceptableRecoveryLag,
+            maxWarmups,
+            1,
+            false,
+            90_000L,
+            60_000L,
+            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+        );
+    }
+
     static Map<UUID, ClientState> getClientStatesMap(final ClientState... states) {
         final Map<UUID, ClientState> clientStates = new HashMap<>();
         int nthState = 1;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
index 9ff53b54244..ede16ce6372 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
@@ -30,7 +30,7 @@ public class AssignorConfigurationTest {
     public void configsShouldRejectZeroWarmups() {
         final ConfigException exception = assertThrows(
             ConfigException.class,
-            () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, false, 1L, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
 
         assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
index 631430c6a82..2b87dc09064 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
@@ -656,6 +656,8 @@ public class ClientTagAwareStandbyTaskAssignorTest {
         return new AssignmentConfigs(0L,
                                      1,
                                      numStandbyReplicas,
+                                     false,
+                                     1L,
                                      60000L,
                                      asList(rackAwareAssignmentTags));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
index 0473d9bee45..a43eb5fb857 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
@@ -54,7 +54,7 @@ public class FallbackPriorTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 1L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
         assertThat(probingRebalanceNeeded, is(true));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index 90e0fed51f3..708f6e3afb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -34,7 +33,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
@@ -56,6 +54,12 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndLagAndWarmups;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndWarmups;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndWarmups;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndZeroLagAndWarmups;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithOneStandbys;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
@@ -67,23 +71,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.fail;
 
-public class HighAvailabilityTaskAssignorTest {
-    private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs(
-        /*acceptableRecoveryLag*/ 100L,
-        /*maxWarmupReplicas*/ 2,
-        /*numStandbyReplicas*/ 0,
-        /*probingRebalanceIntervalMs*/ 60 * 1000L,
-        /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-    );
-
-    private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
-        /*acceptableRecoveryLag*/ 100L,
-        /*maxWarmupReplicas*/ 2,
-        /*numStandbyReplicas*/ 1,
-        /*probingRebalanceIntervalMs*/ 60 * 1000L,
-        /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-    );
-
+public class HighAvailabilityTaskAssignorTest {    
     @Test
     public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
         final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
@@ -101,7 +89,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(11L, 2, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithOneStandbysAndLagAndWarmups(11L, 2)
         );
 
         assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
@@ -130,7 +118,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(Long.MAX_VALUE, 1, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithOneStandbysAndLagAndWarmups(Long.MAX_VALUE, 1)
         );
 
         assertThat(clientState1, hasAssignedTasks(6));
@@ -151,7 +139,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
         assertThat(unstable, is(false));
         assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@@ -172,7 +160,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
         assertThat(unstable, is(false));
         assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@@ -192,7 +180,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertThat(unstable, is(false));
@@ -214,7 +202,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertThat(unstable, is(false));
@@ -243,7 +231,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertThat(unstable, is(false));
@@ -265,7 +253,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertThat(unstable, is(false));
@@ -290,7 +278,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1)
         );
 
         assertThat(unstable, is(true));
@@ -320,7 +308,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1)
         );
 
         assertThat(unstable, is(false));
@@ -341,7 +329,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertThat(unstable, is(false));
@@ -361,7 +349,7 @@ public class HighAvailabilityTaskAssignorTest {
         final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
                                                                                          allTasks,
                                                                                          singleton(TASK_0_0),
-                                                                                         configWithoutStandbys);
+                                                                                         getDefaultConfigsWithZeroStandbys());
 
         assertThat(probingRebalanceNeeded, is(false));
         assertThat(client1, hasActiveTasks(2));
@@ -384,7 +372,7 @@ public class HighAvailabilityTaskAssignorTest {
         final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
                                                                                          allTasks,
                                                                                          statefulTasks,
-                                                                                         configWithStandbys);
+                                                                                         getDefaultConfigsWithOneStandbys());
 
         assertThat(clientStates.get(UUID_2).standbyTasks(), not(empty()));
         assertThat(probingRebalanceNeeded, is(false));
@@ -406,7 +394,7 @@ public class HighAvailabilityTaskAssignorTest {
         );
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
 
         assertThat(clientStates.get(UUID_1).activeTasks(), is(singleton(TASK_0_1)));
         assertThat(clientStates.get(UUID_2).activeTasks(), is(singleton(TASK_0_0)));
@@ -433,7 +421,7 @@ public class HighAvailabilityTaskAssignorTest {
         );
 
         final boolean probingRebalanceNeeded =
-                new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+                new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
 
         assertThat(clientStates.get(UUID_1).activeTasks(), is(emptySet()));
         assertThat(clientStates.get(UUID_2).activeTasks(), is(emptySet()));
@@ -460,7 +448,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
 
 
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0)));
@@ -480,7 +468,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
 
 
         assertThat(client1.activeTaskCount(), equalTo(1));
@@ -498,7 +486,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
 
 
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
@@ -521,13 +509,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            new AssignmentConfigs(
-                /*acceptableRecoveryLag*/ 100L,
-                /*maxWarmupReplicas*/ 1,
-                /*numStandbyReplicas*/ 0,
-                /*probingRebalanceIntervalMs*/ 60 * 1000L,
-                /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-            )
+            getConfigsWithZeroStandbysAndWarmups(1)
         );
 
 
@@ -550,13 +532,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            new AssignmentConfigs(
-                /*acceptableRecoveryLag*/ 100L,
-                /*maxWarmupReplicas*/ 1,
-                /*numStandbyReplicas*/ 1,
-                /*probingRebalanceIntervalMs*/ 60 * 1000L,
-                /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-            )
+            getConfigsWithOneStandbysAndWarmups(1)
         );
 
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));
@@ -574,7 +550,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
 
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
         assertHasNoStandbyTasks(client1);
@@ -590,7 +566,7 @@ public class HighAvailabilityTaskAssignorTest {
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1);
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
         assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
         assertHasNoStandbyTasks(client1);
         assertThat(probingRebalanceNeeded, is(false));
@@ -607,7 +583,7 @@ public class HighAvailabilityTaskAssignorTest {
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
 
         assertValidAssignment(
             1,
@@ -632,7 +608,7 @@ public class HighAvailabilityTaskAssignorTest {
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
         assertValidAssignment(
             1,
             2,
@@ -665,7 +641,7 @@ public class HighAvailabilityTaskAssignorTest {
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
 
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
 
         assertThat(client1.activeTasks(), not(empty()));
         assertThat(client2.activeTasks(), not(empty()));
@@ -682,7 +658,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
 
         assertThat(probingRebalanceNeeded, is(false));
         assertThat(client1.activeTasks(), equalTo(client1.prevActiveTasks()));
@@ -698,7 +674,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
         assertThat(probingRebalanceNeeded, is(false));
         assertHasNoStandbyTasks(client1, client2);
     }
@@ -712,7 +688,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
         final boolean probingRebalanceNeeded =
-            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+            new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
         assertThat(probingRebalanceNeeded, is(true));
         assertThat(client2.standbyTaskCount(), equalTo(1));
     }
@@ -734,7 +710,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertValidAssignment(
@@ -765,7 +741,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertValidAssignment(
@@ -796,7 +772,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertValidAssignment(
@@ -827,7 +803,7 @@ public class HighAvailabilityTaskAssignorTest {
             clientStates,
             allTasks,
             statefulTasks,
-            new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
         );
 
         assertValidAssignment(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
index fdd7fa1d473..0c22cd290e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
@@ -28,6 +28,8 @@ public class StandbyTaskAssignorFactoryTest {
     private static final long ACCEPTABLE_RECOVERY_LAG = 0L;
     private static final int MAX_WARMUP_REPLICAS = 1;
     private static final int NUMBER_OF_STANDBY_REPLICAS = 1;
+    private static final boolean PARTITION_AUTOSCALING_ENABLED = false;
+    private static final long PARTITION_AUTOSCALING_TIMEOUT_MS = 90000L;
     private static final long PROBING_REBALANCE_INTERVAL_MS = 60000L;
 
     @Test
@@ -46,6 +48,8 @@ public class StandbyTaskAssignorFactoryTest {
         return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG,
                                                            MAX_WARMUP_REPLICAS,
                                                            NUMBER_OF_STANDBY_REPLICAS,
+                                                           PARTITION_AUTOSCALING_ENABLED,
+                                                           PARTITION_AUTOSCALING_TIMEOUT_MS,
                                                            PROBING_REBALANCE_INTERVAL_MS,
                                                            rackAwareAssignmentTags);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 8c1347f22d9..82e3de4baf7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -677,7 +677,7 @@ public class StickyTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
         assertThat(probingRebalanceNeeded, is(false));
 
@@ -696,7 +696,7 @@ public class StickyTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index c1be5f33fa2..2b1564fa1a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -29,11 +29,13 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.function.Supplier;
 
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.appendClientStates;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
 import static org.junit.Assert.fail;
 
@@ -229,11 +231,7 @@ public class TaskAssignorConvergenceTest {
 
     @Test
     public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
-        final AssignmentConfigs configs = new AssignmentConfigs(100L,
-                                                                2,
-                                                                0,
-                                                                60_000L,
-                                                                EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
+        final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
 
         final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
 
@@ -249,11 +247,7 @@ public class TaskAssignorConvergenceTest {
         final int maxWarmupReplicas = 2;
         final int numStandbyReplicas = 0;
 
-        final AssignmentConfigs configs = new AssignmentConfigs(100L,
-                                                                maxWarmupReplicas,
-                                                                numStandbyReplicas,
-                                                                60_000L,
-                                                                EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
+        final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
 
         final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1, () -> 5);
         testForConvergence(harness, configs, 1);
@@ -272,11 +266,7 @@ public class TaskAssignorConvergenceTest {
         final int maxWarmupReplicas = 2;
         final int numStandbyReplicas = 0;
 
-        final AssignmentConfigs configs = new AssignmentConfigs(100L,
-                                                                maxWarmupReplicas,
-                                                                numStandbyReplicas,
-                                                                60_000L,
-                                                                EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
+        final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
 
         final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7, () -> 5);
         testForConvergence(harness, configs, 1);
@@ -314,9 +304,11 @@ public class TaskAssignorConvergenceTest {
 
             final int numberOfEvents = prng.nextInt(10) + 1;
 
-            final AssignmentConfigs configs = new AssignmentConfigs(100L,
+            final AssignmentConfigs configs = new AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
                                                                     maxWarmupReplicas,
                                                                     numStandbyReplicas,
+                                                                    false,
+                                                                    90_000L,
                                                                     60_000L,
                                                                     EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
 
@@ -427,5 +419,4 @@ public class TaskAssignorConvergenceTest {
         }
     }
 
-
 }