You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/14 20:01:07 UTC
[kafka] branch 3.5 updated: Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.5 by this push:
new b14ebf9e9cf Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527)
b14ebf9e9cf is described below
commit b14ebf9e9cf310ff03dfe9ca3d2cdd7fdafc8944
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Apr 14 12:08:49 2023 -0700
Revert "KAFKA-14318: KIP-878, Introduce partition autoscaling configs (#12962)" (#13527)
This reverts commit d9b139220ee253da673af44d58dc87bd184188f1.
KIP-878 implementation did not make any progress, so we need to revert
the public API changes which are not functional right now.
Reviewers: Bill Bejeck <bi...@confluent.io>
---
.../org/apache/kafka/streams/StreamsConfig.java | 25 -----
.../assignment/AssignorConfiguration.java | 12 ---
.../apache/kafka/streams/StreamsConfigTest.java | 16 ----
.../internals/assignment/AssignmentTestUtils.java | 88 ------------------
.../assignment/AssignorConfigurationTest.java | 2 +-
.../ClientTagAwareStandbyTaskAssignorTest.java | 2 -
.../assignment/FallbackPriorTaskAssignorTest.java | 2 +-
.../HighAvailabilityTaskAssignorTest.java | 102 +++++++++++++--------
.../assignment/StandbyTaskAssignorFactoryTest.java | 4 -
.../assignment/StickyTaskAssignorTest.java | 4 +-
.../assignment/TaskAssignorConvergenceTest.java | 25 +++--
11 files changed, 84 insertions(+), 198 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index bb0c519d380..6eb34dfd9dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -631,18 +631,6 @@ public class StreamsConfig extends AbstractConfig {
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
- /** {@code partition.autoscaling.enabled} */
- @SuppressWarnings("WeakerAccess")
- public static final String PARTITION_AUTOSCALING_ENABLED_CONFIG = "partition.autoscaling.enabled";
- private static final String PARTITION_AUTOSCALING_ENABLED_DOC = "Enable autoscaling the partitions of internal topics which are managed by Streams."
- + " If an internal topic's partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input "
- + "topic(s) will result in the internal topic(s) automatically being expanded to match.";
-
- /** {@code partition.autoscaling.timeout.ms} */
- @SuppressWarnings("WeakerAccess")
- public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms";
- private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions.";
-
/** {@code poll.ms} */
@SuppressWarnings("WeakerAccess")
public static final String POLL_MS_CONFIG = "poll.ms";
@@ -1014,17 +1002,6 @@ public class StreamsConfig extends AbstractConfig {
true,
Importance.LOW,
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
- .define(PARTITION_AUTOSCALING_ENABLED_CONFIG,
- Type.BOOLEAN,
- false,
- Importance.LOW,
- PARTITION_AUTOSCALING_ENABLED_DOC)
- .define(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG,
- Type.LONG,
- 15 * 60 * 1000L,
- atLeast(0),
- Importance.LOW,
- PARTITION_AUTOSCALING_TIMEOUT_MS_DOC)
.define(POLL_MS_CONFIG,
Type.LONG,
100L,
@@ -1563,8 +1540,6 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
- consumerProps.put(PARTITION_AUTOSCALING_ENABLED_CONFIG, getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
- consumerProps.put(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG));
consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 3825a33eb34..d40489eab29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -261,16 +261,10 @@ public final class AssignorConfiguration {
void onAssignmentComplete(final boolean stable);
}
- /**
- * NOTE: any StreamsConfig you add here MUST be passed in to the consumer via
- * {@link StreamsConfig#getMainConsumerConfigs}
- */
public static class AssignmentConfigs {
public final long acceptableRecoveryLag;
public final int maxWarmupReplicas;
public final int numStandbyReplicas;
- public final boolean partitionAutoscalingEnabled;
- public final long partitionAutoscalingTimeoutMs;
public final long probingRebalanceIntervalMs;
public final List<String> rackAwareAssignmentTags;
@@ -278,8 +272,6 @@ public final class AssignorConfiguration {
acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
- partitionAutoscalingEnabled = configs.getBoolean(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG);
- partitionAutoscalingTimeoutMs = configs.getLong(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG);
probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
}
@@ -287,15 +279,11 @@ public final class AssignorConfiguration {
AssignmentConfigs(final Long acceptableRecoveryLag,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
- final boolean partitionAutoscalingEnabled,
- final long partitionAutoscalingTimeoutMs,
final Long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags) {
this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
- this.partitionAutoscalingEnabled = validated(StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG, partitionAutoscalingEnabled);
- this.partitionAutoscalingTimeoutMs = validated(StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG, partitionAutoscalingTimeoutMs);
this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 29c1be977c5..05582b74aee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -62,8 +62,6 @@ import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
-import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_ENABLED_CONFIG;
-import static org.apache.kafka.streams.StreamsConfig.PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
@@ -1371,20 +1369,6 @@ public class StreamsConfigTest {
assertEquals(0, configs.size());
}
- @Test
- public void shouldEnablePartitionAutoscaling() {
- props.put("partition.autoscaling.enabled", true);
- final StreamsConfig config = new StreamsConfig(props);
- assertTrue(config.getBoolean(PARTITION_AUTOSCALING_ENABLED_CONFIG));
- }
-
- @Test
- public void shouldSetPartitionAutoscalingTimeout() {
- props.put("partition.autoscaling.timeout.ms", 0L);
- final StreamsConfig config = new StreamsConfig(props);
- assertThat(config.getLong(PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG), is(0L));
- }
-
@Test
public void shouldReturnDefaultClientSupplier() {
final KafkaClientSupplier supplier = streamsConfig.getKafkaClientSupplier();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 181bedab6f5..8993372dd5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
@@ -113,93 +112,6 @@ public final class AssignmentTestUtils {
private AssignmentTestUtils() {}
- public static final long ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT = 100;
-
- public static AssignmentConfigs getDefaultConfigsWithZeroStandbys() {
- return new AssignmentConfigs(
- ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
- 2,
- 0,
- false,
- 90_000L,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
- }
-
- public static AssignmentConfigs getDefaultConfigsWithOneStandbys() {
- return new AssignmentConfigs(
- ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
- 2,
- 1,
- false,
- 90_000L,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
- }
-
- public static AssignmentConfigs getConfigsWithZeroStandbysAndWarmups(final int maxWarmups) {
- return new AssignmentConfigs(
- ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
- maxWarmups,
- 0,
- false,
- 90_000L,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
- }
-
- public static AssignmentConfigs getConfigsWithOneStandbysAndWarmups(final int maxWarmups) {
- return new AssignmentConfigs(
- ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
- maxWarmups,
- 1,
- false,
- 90_000L,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
- }
-
- public static AssignmentConfigs getConfigsWithOneStandbysAndZeroLagAndWarmups(final int maxWarmups) {
- return new AssignmentConfigs(
- 0L,
- maxWarmups,
- 1,
- false,
- 90_000L,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
- }
-
- public static AssignmentConfigs getConfigsWithZeroStandbysAndZeroLagAndWarmups(final int maxWarmups) {
- return new AssignmentConfigs(
- 0L,
- maxWarmups,
- 0,
- false,
- 90_000L,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
- }
-
- public static AssignmentConfigs getConfigsWithOneStandbysAndLagAndWarmups(final long acceptableRecoveryLag,
- final int maxWarmups) {
- return new AssignmentConfigs(
- acceptableRecoveryLag,
- maxWarmups,
- 1,
- false,
- 90_000L,
- 60_000L,
- EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
- );
- }
-
static Map<UUID, ClientState> getClientStatesMap(final ClientState... states) {
final Map<UUID, ClientState> clientStates = new HashMap<>();
int nthState = 1;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
index ede16ce6372..9ff53b54244 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
@@ -30,7 +30,7 @@ public class AssignorConfigurationTest {
public void configsShouldRejectZeroWarmups() {
final ConfigException exception = assertThrows(
ConfigException.class,
- () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, false, 1L, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas"));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
index 2b87dc09064..631430c6a82 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
@@ -656,8 +656,6 @@ public class ClientTagAwareStandbyTaskAssignorTest {
return new AssignmentConfigs(0L,
1,
numStandbyReplicas,
- false,
- 1L,
60000L,
asList(rackAwareAssignmentTags));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
index a43eb5fb857..0473d9bee45 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
@@ -54,7 +54,7 @@ public class FallbackPriorTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 1L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(probingRebalanceNeeded, is(true));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index 708f6e3afb0..90e0fed51f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.junit.Test;
import java.util.HashMap;
@@ -33,6 +34,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
@@ -54,12 +56,6 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndLagAndWarmups;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithOneStandbysAndWarmups;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndWarmups;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getConfigsWithZeroStandbysAndZeroLagAndWarmups;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithOneStandbys;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
@@ -71,7 +67,23 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.fail;
-public class HighAvailabilityTaskAssignorTest {
+public class HighAvailabilityTaskAssignorTest {
+ private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs(
+ /*acceptableRecoveryLag*/ 100L,
+ /*maxWarmupReplicas*/ 2,
+ /*numStandbyReplicas*/ 0,
+ /*probingRebalanceIntervalMs*/ 60 * 1000L,
+ /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+
+ private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+ /*acceptableRecoveryLag*/ 100L,
+ /*maxWarmupReplicas*/ 2,
+ /*numStandbyReplicas*/ 1,
+ /*probingRebalanceIntervalMs*/ 60 * 1000L,
+ /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ );
+
@Test
public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
@@ -89,7 +101,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithOneStandbysAndLagAndWarmups(11L, 2)
+ new AssignmentConfigs(11L, 2, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
@@ -118,7 +130,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithOneStandbysAndLagAndWarmups(Long.MAX_VALUE, 1)
+ new AssignmentConfigs(Long.MAX_VALUE, 1, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(clientState1, hasAssignedTasks(6));
@@ -139,7 +151,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@@ -160,7 +172,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@@ -180,7 +192,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@@ -202,7 +214,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@@ -231,7 +243,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@@ -253,7 +265,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@@ -278,7 +290,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1)
+ new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(true));
@@ -308,7 +320,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(allTaskIds.size() / 3 + 1)
+ new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@@ -329,7 +341,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@@ -349,7 +361,7 @@ public class HighAvailabilityTaskAssignorTest {
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
allTasks,
singleton(TASK_0_0),
- getDefaultConfigsWithZeroStandbys());
+ configWithoutStandbys);
assertThat(probingRebalanceNeeded, is(false));
assertThat(client1, hasActiveTasks(2));
@@ -372,7 +384,7 @@ public class HighAvailabilityTaskAssignorTest {
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
allTasks,
statefulTasks,
- getDefaultConfigsWithOneStandbys());
+ configWithStandbys);
assertThat(clientStates.get(UUID_2).standbyTasks(), not(empty()));
assertThat(probingRebalanceNeeded, is(false));
@@ -394,7 +406,7 @@ public class HighAvailabilityTaskAssignorTest {
);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
assertThat(clientStates.get(UUID_1).activeTasks(), is(singleton(TASK_0_1)));
assertThat(clientStates.get(UUID_2).activeTasks(), is(singleton(TASK_0_0)));
@@ -421,7 +433,7 @@ public class HighAvailabilityTaskAssignorTest {
);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
assertThat(clientStates.get(UUID_1).activeTasks(), is(emptySet()));
assertThat(clientStates.get(UUID_2).activeTasks(), is(emptySet()));
@@ -448,7 +460,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0)));
@@ -468,7 +480,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
assertThat(client1.activeTaskCount(), equalTo(1));
@@ -486,7 +498,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
@@ -509,7 +521,13 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- getConfigsWithZeroStandbysAndWarmups(1)
+ new AssignmentConfigs(
+ /*acceptableRecoveryLag*/ 100L,
+ /*maxWarmupReplicas*/ 1,
+ /*numStandbyReplicas*/ 0,
+ /*probingRebalanceIntervalMs*/ 60 * 1000L,
+ /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ )
);
@@ -532,7 +550,13 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- getConfigsWithOneStandbysAndWarmups(1)
+ new AssignmentConfigs(
+ /*acceptableRecoveryLag*/ 100L,
+ /*maxWarmupReplicas*/ 1,
+ /*numStandbyReplicas*/ 1,
+ /*probingRebalanceIntervalMs*/ 60 * 1000L,
+ /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
+ )
);
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));
@@ -550,7 +574,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
assertHasNoStandbyTasks(client1);
@@ -566,7 +590,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
assertThat(client1.activeTasks(), equalTo(mkSet(TASK_0_0, TASK_0_1)));
assertHasNoStandbyTasks(client1);
assertThat(probingRebalanceNeeded, is(false));
@@ -583,7 +607,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
assertValidAssignment(
1,
@@ -608,7 +632,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithOneStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithStandbys);
assertValidAssignment(
1,
2,
@@ -641,7 +665,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
assertThat(client1.activeTasks(), not(empty()));
assertThat(client2.activeTasks(), not(empty()));
@@ -658,7 +682,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
assertThat(probingRebalanceNeeded, is(false));
assertThat(client1.activeTasks(), equalTo(client1.prevActiveTasks()));
@@ -674,7 +698,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
assertThat(probingRebalanceNeeded, is(false));
assertHasNoStandbyTasks(client1, client2);
}
@@ -688,7 +712,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2);
final boolean probingRebalanceNeeded =
- new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, getDefaultConfigsWithZeroStandbys());
+ new HighAvailabilityTaskAssignor().assign(clientStates, allTasks, statefulTasks, configWithoutStandbys);
assertThat(probingRebalanceNeeded, is(true));
assertThat(client2.standbyTaskCount(), equalTo(1));
}
@@ -710,7 +734,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertValidAssignment(
@@ -741,7 +765,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertValidAssignment(
@@ -772,7 +796,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertValidAssignment(
@@ -803,7 +827,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
- getConfigsWithZeroStandbysAndZeroLagAndWarmups(1)
+ new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertValidAssignment(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
index 0c22cd290e7..fdd7fa1d473 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java
@@ -28,8 +28,6 @@ public class StandbyTaskAssignorFactoryTest {
private static final long ACCEPTABLE_RECOVERY_LAG = 0L;
private static final int MAX_WARMUP_REPLICAS = 1;
private static final int NUMBER_OF_STANDBY_REPLICAS = 1;
- private static final boolean PARTITION_AUTOSCALING_ENABLED = false;
- private static final long PARTITION_AUTOSCALING_TIMEOUT_MS = 90000L;
private static final long PROBING_REBALANCE_INTERVAL_MS = 60000L;
@Test
@@ -48,8 +46,6 @@ public class StandbyTaskAssignorFactoryTest {
return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG,
MAX_WARMUP_REPLICAS,
NUMBER_OF_STANDBY_REPLICAS,
- PARTITION_AUTOSCALING_ENABLED,
- PARTITION_AUTOSCALING_TIMEOUT_MS,
PROBING_REBALANCE_INTERVAL_MS,
rackAwareAssignmentTags);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 82e3de4baf7..8c1347f22d9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -677,7 +677,7 @@ public class StickyTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(probingRebalanceNeeded, is(false));
@@ -696,7 +696,7 @@ public class StickyTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, false, 90_000L, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+ new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index 2b1564fa1a1..c1be5f33fa2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -29,13 +29,11 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Supplier;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.appendClientStates;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getDefaultConfigsWithZeroStandbys;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
import static org.junit.Assert.fail;
@@ -231,7 +229,11 @@ public class TaskAssignorConvergenceTest {
@Test
public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
- final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
+ final AssignmentConfigs configs = new AssignmentConfigs(100L,
+ 2,
+ 0,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
@@ -247,7 +249,11 @@ public class TaskAssignorConvergenceTest {
final int maxWarmupReplicas = 2;
final int numStandbyReplicas = 0;
- final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
+ final AssignmentConfigs configs = new AssignmentConfigs(100L,
+ maxWarmupReplicas,
+ numStandbyReplicas,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1, () -> 5);
testForConvergence(harness, configs, 1);
@@ -266,7 +272,11 @@ public class TaskAssignorConvergenceTest {
final int maxWarmupReplicas = 2;
final int numStandbyReplicas = 0;
- final AssignmentConfigs configs = getDefaultConfigsWithZeroStandbys();
+ final AssignmentConfigs configs = new AssignmentConfigs(100L,
+ maxWarmupReplicas,
+ numStandbyReplicas,
+ 60_000L,
+ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7, () -> 5);
testForConvergence(harness, configs, 1);
@@ -304,11 +314,9 @@ public class TaskAssignorConvergenceTest {
final int numberOfEvents = prng.nextInt(10) + 1;
- final AssignmentConfigs configs = new AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG_TEST_DEFAULT,
+ final AssignmentConfigs configs = new AssignmentConfigs(100L,
maxWarmupReplicas,
numStandbyReplicas,
- false,
- 90_000L,
60_000L,
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
@@ -419,4 +427,5 @@ public class TaskAssignorConvergenceTest {
}
}
+
}