You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/09 23:02:44 UTC
[kafka] branch trunk updated: KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d9b139220ee KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)
d9b139220ee is described below
commit d9b139220ee253da673af44d58dc87bd184188f1
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Dec 9 15:02:36 2022 -0800
KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)
First PR for KIP-878: Internal Topic Autoscaling for Kafka Streams
Introduces two new configs related to autoscaling in Streams: a feature flag and retry timeout. This PR just adds the configs and gets them passed through to the Streams assignor where they'll ultimately be needed/used
Reviewers: Bill Bejeck <bi...@confluent.io>, Walker Carlson <wc...@confluent.io>
---
.../org/apache/kafka/streams/StreamsConfig.java | 25 +++++
.../assignment/AssignorConfiguration.java | 12 +++
.../apache/kafka/streams/StreamsConfigTest.java | 16 ++++
.../internals/assignment/AssignmentTestUtils.java | 88 ++++++++++++++++++
.../assignment/AssignorConfigurationTest.java | 2 +-
.../ClientTagAwareStandbyTaskAssignorTest.java | 2 +
.../assignment/FallbackPriorTaskAssignorTest.java | 2 +-
.../HighAvailabilityTaskAssignorTest.java | 102 ++++++++-------------
.../assignment/StandbyTaskAssignorFactoryTest.java | 4 +
.../assignment/StickyTaskAssignorTest.java | 4 +-
.../assignment/TaskAssignorConvergenceTest.java | 25 ++---
11 files changed, 198 insertions(+), 84 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 9fabf3c4298..2d26b750005 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -614,6 +614,18 @@ public class StreamsConfig extends AbstractConfig {
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+ /** {@code partition.autoscaling.enabled} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String PARTITION_AUTOSCALING_ENABLED_CONFIG = "partition.autoscaling.enabled";
+ private static final String PARTITION_AUTOSCALING_ENABLED_DOC = "Enable autoscaling the partitions of internal topics which are managed by Streams."
+ + " If an internal topic's partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input "
+ + "topic(s) will result in the internal topic(s) automatically being expanded to match.";
+
+ /** {@code partition.autoscaling.timeout.ms} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms";
+ private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions.";
+
/** {@code poll.ms} */
@SuppressWarnings("WeakerAccess")
public static final String POLL_MS_CONFIG = "poll.ms";
@@ -975,6 +987,17 @@ public class StreamsConfig extends AbstractConfig {
true,
Importance.LOW,
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
+ .define(PARTITION_AUTOSCALING_ENABLED_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
+ PARTITION_AUTOSCALING_ENABLED_DOC)
+ .define(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG,
+ Type.LONG,
+ 15 * 60 * 1000L,
+ atLeast(0),
+ Importance.LOW,
+ PARTITION_AUTOSCALING_TIMEOUT_MS_DOC)
.define(POLL_MS_CONFIG,
Type.LONG,
100L,
@@ -1511,6 +1534,8 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+ consumerProps.put(PARTITION_AUTOSCALING_ENABLED_CONFIG, getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
+ consumerProps.put(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG));
consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 4a3d46cfc3b..0ac72b71587 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -259,10 +259,16 @@ public final class AssignorConfiguration {
void onAssignmentComplete(final boolean stable);
}
+ /**
+ * NOTE: any StreamsConfig you add here MUST be passed in to the consumer via
+ * {@link StreamsConfig#getMainConsumerConfigs}
+ */
public static class AssignmentConfigs {
public final long acceptableRecoveryLag;
public final int maxWarmupReplicas;
public final int numStandbyReplicas;
+ public final boolean partitionAutoscalingEnabled;
+ public final long partitionAutoscalingTimeoutMs;
public final long probingRebalanceIntervalMs;
public final List<String> rackAwareAssignmentTags;
@@ -270,6 +276,8 @@ public final class AssignorConfiguration {
acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+ partitionAutoscalingEnabled = configs.getBoolean(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG);
+ partitionAutoscalingTimeoutMs = configs.getLong(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG);
probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
}
@@ -277,11 +285,15 @@ public final class AssignorConfiguration {
AssignmentConfigs(final Long acceptableRecoveryLag,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
+ final boolean partitionAutoscalingEnabled,
+ final long partitionAutoscalingTimeoutMs,
final Long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags) {
this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
+ this.partitionAutoscalingEnabled = validated(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG, partitionAutoscalingEnabled);
+ this.partitionAutoscalingTimeoutMs = validated(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, partitionAutoscalingTimeoutMs);
this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f0c0ce74ccd..b271e076db2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -61,6 +61,8 @@ import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
+import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
@@ -1368,6 +1370,20 @@ public class StreamsConfigTest {
assertEquals(0, configs.size());
}
+ @Test
+ public void shouldEnablePartitionAutoscaling() {
+ props.put("partition.autoscaling.enabled", true);
+ final StreamsConfig config = new StreamsConfig(props);
+ assertTrue(config.getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
+ }
+
+ @Test
+ public void shouldSetPartitionAutoscalingTimeout() {
+ props.put("partition.autoscaling.timeout.ms", 0L);
+ final StreamsConfig config = new StreamsConfig(props);
+ assertThat(config.getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG), is(0L));
+ }
+
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 78c6477f386..71c92de2e7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.easymock.EasyMock;
import org.hamcrest.BaseMatcher;
@@ -112,6 +113,93 @@ public final class AssignmentTestUtils {
private AssignmentTestUtils() {}
+ public static final long ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT = 100;
+
+ public static AssignmentConfigs getDefaultConfigsWithZeroStandbys() {
+ return new AssignmentConfigs(
+ ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+ 2,
+ 0,
+ false,
+ 90_000L,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+ }
+
+ public static AssignmentConfigs getDefaultConfigsWithOneStandbys() {
+ return new AssignmentConfigs(
+ ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+ 2,
+ 1,
+ false,
+ 90_000L,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+ }
+
+ public static AssignmentConfigs getConfigsWithZeroStandbysAndWarmups(final int maxWarmups) {
+ return new AssignmentConfigs(
+ ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+ maxWarmups,
+ 0,
+ false,
+ 90_000L,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+ }
+
+ public static AssignmentConfigs getConfigsWithOneStandbysAndWarmups(final int maxWarmups) {
+ return new AssignmentConfigs(
+ ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+ maxWarmups,
+ 1,
+ false,
+ 90_000L,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+ }
+
+ public static AssignmentConfigs getConfigsWithOneStandbysAndZeroLagAndWarmups(final int maxWarmups) {
+ return new AssignmentConfigs(
+ 0L,
+ maxWarmups,
+ 1,
+ false,
+ 90_000L,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+ }
+
+ public static AssignmentConfigs getConfigsWithZeroStandbysAndZeroLagAndWarmups(final int maxWarmups) {
+ return new AssignmentConfigs(
+ 0L,
+ maxWarmups,
+ 0,
+ false,
+ 90_000L,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+ }
+
+ public static AssignmentConfigs getConfigsWithOneStandbysAndLagAndWarmups(final long acceptableRecoveryLag,
+ final int maxWarmups) {
+ return new AssignmentConfigs(
+ acceptableRecoveryLag,
+ maxWarmups,
+ 1,
+ false,
+ 90_000L,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+ }
+
static Map<UUID, ClientState> getClientStatesMap(final ClientState... states) {
final Map<UUID, ClientState> clientStates = new HashMap<>();
int nthState = 1;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
index 9ff53b54244..ede16ce6372 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
@@ -30,7 +30,7 @@ public class AssignorConfigurationTest {
public void configsShouldRejectZeroWarmups() {
final ConfigException exception = assertThrows(
ConfigException.class,
- () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, false, 1L, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
index 631430c6a82..2b87dc09064 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
@@ -656,6 +656,8 @@ public class ClientTagAwareStandbyTaskAssignorTest {
return new AssignmentConfigs(0L,
1,
numStandbyReplicas,
+ false,
+ 1L,
60000L,
asList(rackAwareAssignmentTags));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
index 0473d9bee45..a43eb5fb857 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
@@ -54,7 +54,7 @@ public class FallbackPriorTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 1L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(probingRebalanceNeeded, is(true));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index 90e0fed51f3..708f6e3afb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.junit.Test;
import java.util.HashMap;
@@ -34,7 +33,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
@@ -56,6 +54,12 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndLagAndWarmups;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndWarmups;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndWarmups;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndZeroLagAndWarmups;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithOneStandbys;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
@@ -67,23 +71,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.fail;
-public class HighAvailabilityTaskAssignorTest {
- private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs(
- /*acceptableRecoveryLag*/ 100L,
- /*maxWarmupReplicas*/ 2,
- /*numStandbyReplicas*/ 0,
- /*probingRebalanceIntervalMs*/ 60 * 1000L,
- /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
-
- private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
- /*acceptableRecoveryLag*/ 100L,
- /*maxWarmupReplicas*/ 2,
- /*numStandbyReplicas*/ 1,
- /*probingRebalanceIntervalMs*/ 60 * 1000L,
- /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
-
+public class HighAvailabilityTaskAssignorTest {
@Test
public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
@@ -101,7 +89,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(11L, 2, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithOneStandbysAndLagAndWarmups(11L, 2)
);
assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
@@ -130,7 +118,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(Long.MAX_VALUE, 1, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithOneStandbysAndLagAndWarmups(Long.MAX_VALUE, 1)
);
assertThat(clientState1, hasAssignedTasks(6));
@@ -151,7 +139,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertThat(unstable, is(false));
assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@@ -172,7 +160,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertThat(unstable, is(false));
assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@@ -192,7 +180,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertThat(unstable, is(false));
@@ -214,7 +202,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertThat(unstable, is(false));
@@ -243,7 +231,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertThat(unstable, is(false));
@@ -265,7 +253,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertThat(unstable, is(false));
@@ -290,7 +278,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1)
);
assertThat(unstable, is(true));
@@ -320,7 +308,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1)
);
assertThat(unstable, is(false));
@@ -341,7 +329,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertThat(unstable, is(false));
@@ -361,7 +349,7 @@ public class HighAvailabilityTaskAssignorTest {
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
allTasks,
singleton(TASK_0_0),
- configWithoutStandbys);
+ getDefaultConfigsWithZeroStandbys());
assertThat(probingRebalanceNeeded, is(false));
assertThat(client1, hasActiveTasks(2));
@@ -384,7 +372,7 @@ public class HighAvailabilityTaskAssignorTest {
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
allTasks,
statefulTasks,
- configWithStandbys);
+ getDefaultConfigsWithOneStandbys());
assertThat(clientStates.get(UUID_2).standbyTasks(), not(empty()));
assertThat(probingRebalanceNeeded, is(false));
@@ -406,7 +394,7 @@ public class HighAvailabilityTaskAssignorTest {
);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
assertThat(clientStates.get(UUID_1).activeTasks(), is(singleton(TASK_0_1)));
assertThat(clientStates.get(UUID_2).activeTasks(), is(singleton(TASK_0_0)));
@@ -433,7 +421,7 @@ public class HighAvailabilityTaskAssignorTest {
);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
assertThat(clientStates.get(UUID_1).activeTasks(), is(emptySet()));
assertThat(clientStates.get(UUID_2).activeTasks(), is(emptySet()));
@@ -460,7 +448,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0)));
@@ -480,7 +468,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
assertThat(client1.activeTaskCount(), equalTo(1));
@@ -498,7 +486,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
@@ -521,13 +509,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- new AssignmentConfigs(
- /*acceptableRecoveryLag*/ 100L,
- /*maxWarmupReplicas*/ 1,
- /*numStandbyReplicas*/ 0,
- /*probingRebalanceIntervalMs*/ 60 * 1000L,
- /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- )
+ getConfigsWithZeroStandbysAndWarmups(1)
);
@@ -550,13 +532,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- new AssignmentConfigs(
- /*acceptableRecoveryLag*/ 100L,
- /*maxWarmupReplicas*/ 1,
- /*numStandbyReplicas*/ 1,
- /*probingRebalanceIntervalMs*/ 60 * 1000L,
- /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- )
+ getConfigsWithOneStandbysAndWarmups(1)
);
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));
@@ -574,7 +550,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
assertHasNoStandbyTasks(client1);
@@ -590,7 +566,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
assertHasNoStandbyTasks(client1);
assertThat(probingRebalanceNeeded, is(false));
@@ -607,7 +583,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
assertValidAssignment(
1,
@@ -632,7 +608,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
assertValidAssignment(
1,
2,
@@ -665,7 +641,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
assertThat(client1.activeTasks(), not(empty()));
assertThat(client2.activeTasks(), not(empty()));
@@ -682,7 +658,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
assertThat(probingRebalanceNeeded, is(false));
assertThat(client1.activeTasks(), equalTo(client1.prevActiveTasks()));
@@ -698,7 +674,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
assertThat(probingRebalanceNeeded, is(false));
assertHasNoStandbyTasks(client1, client2);
}
@@ -712,7 +688,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
assertThat(probingRebalanceNeeded, is(true));
assertThat(client2.standbyTaskCount(), equalTo(1));
}
@@ -734,7 +710,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertValidAssignment(
@@ -765,7 +741,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertValidAssignment(
@@ -796,7 +772,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertValidAssignment(
@@ -827,7 +803,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
);
assertValidAssignment(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
index fdd7fa1d473..0c22cd290e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
@@ -28,6 +28,8 @@ public class StandbyTaskAssignorFactoryTest {
private static final long ACCEPTABLE_RECOVERY_LAG = 0L;
private static final int MAX_WARMUP_REPLICAS = 1;
private static final int NUMBER_OF_STANDBY_REPLICAS = 1;
+ private static final boolean PARTITION_AUTOSCALING_ENABLED = false;
+ private static final long PARTITION_AUTOSCALING_TIMEOUT_MS = 90000L;
private static final long PROBING_REBALANCE_INTERVAL_MS = 60000L;
@Test
@@ -46,6 +48,8 @@ public class StandbyTaskAssignorFactoryTest {
return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG,
MAX_WARMUP_REPLICAS,
NUMBER_OF_STANDBY_REPLICAS,
+ PARTITION_AUTOSCALING_ENABLED,
+ PARTITION_AUTOSCALING_TIMEOUT_MS,
PROBING_REBALANCE_INTERVAL_MS,
rackAwareAssignmentTags);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 8c1347f22d9..82e3de4baf7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -677,7 +677,7 @@ public class StickyTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(probingRebalanceNeeded, is(false));
@@ -696,7 +696,7 @@ public class StickyTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index c1be5f33fa2..2b1564fa1a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -29,11 +29,13 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Supplier;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.appendClientStates;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
import static org.junit.Assert.fail;
@@ -229,11 +231,7 @@ public class TaskAssignorConvergenceTest {
@Test
public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
- final AssignmentConfigs configs = new AssignmentConfigs(100L,
- 2,
- 0,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
+ final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
@@ -249,11 +247,7 @@ public class TaskAssignorConvergenceTest {
final int maxWarmupReplicas = 2;
final int numStandbyReplicas = 0;
- final AssignmentConfigs configs = new AssignmentConfigs(100L,
- maxWarmupReplicas,
- numStandbyReplicas,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
+ final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1, () -> 5);
testForConvergence(harness, configs, 1);
@@ -272,11 +266,7 @@ public class TaskAssignorConvergenceTest {
final int maxWarmupReplicas = 2;
final int numStandbyReplicas = 0;
- final AssignmentConfigs configs = new AssignmentConfigs(100L,
- maxWarmupReplicas,
- numStandbyReplicas,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
+ final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7, () -> 5);
testForConvergence(harness, configs, 1);
@@ -314,9 +304,11 @@ public class TaskAssignorConvergenceTest {
final int numberOfEvents = prng.nextInt(10) + 1;
- final AssignmentConfigs configs = new AssignmentConfigs(100L,
+ final AssignmentConfigs configs = new AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
maxWarmupReplicas,
numStandbyReplicas,
+ false,
+ 90_000L,
60_000L,
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
@@ -427,5 +419,4 @@ public class TaskAssignorConvergenceTest {
}
}
-
}