You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/01 14:59:41 UTC
[kafka] branch trunk updated: KAFKA-6145: KIP 441 remove balance
factor (#8597)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 95edaba KAFKA-6145: KIP 441 remove balance factor (#8597)
95edaba is described below
commit 95edaba8615d4ca2d623722eee38eb78fc24d317
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri May 1 07:59:08 2020 -0700
KAFKA-6145: KIP 441 remove balance factor (#8597)
Reviewers: John Roesler <vv...@apache.org>
---
.../org/apache/kafka/streams/StreamsConfig.java | 12 +---
.../internals/StreamsPartitionAssignor.java | 4 --
.../assignment/AssignorConfiguration.java | 4 --
.../internals/assignment/BalancedAssignor.java | 3 +-
.../assignment/DefaultBalancedAssignor.java | 10 ++--
.../assignment/HighAvailabilityTaskAssignor.java | 3 +-
.../apache/kafka/streams/StreamsConfigTest.java | 12 ----
.../internals/StreamsPartitionAssignorTest.java | 2 -
.../assignment/DefaultBalancedAssignorTest.java | 69 +++-------------------
.../assignment/FallbackPriorTaskAssignorTest.java | 2 +-
.../HighAvailabilityTaskAssignorTest.java | 5 --
.../assignment/StickyTaskAssignorTest.java | 4 +-
.../assignment/TaskAssignorConvergenceTest.java | 4 --
13 files changed, 18 insertions(+), 116 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 1df6839..4688848 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -329,10 +329,6 @@ public class StreamsConfig extends AbstractConfig {
public static final String APPLICATION_SERVER_CONFIG = "application.server";
private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on this KafkaStreams instance.";
- /** {@code balance.factor} */
- public static final String BALANCE_FACTOR_CONFIG = "balance.factor";
- private static final String BALANCE_FACTOR_DOC = "Maximum difference in the number of stateful (and total) active tasks assigned to the stream thread with the most tasks and the stream thread with the least in a steady-state assignment. Must be at least 1.";
-
/** {@code bootstrap.servers} */
@SuppressWarnings("WeakerAccess")
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
@@ -462,7 +458,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code probing.rebalance.interval.ms} */
public static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG = "probing.rebalance.interval.ms";
private static final String PROBING_REBALANCE_INTERVAL_MS_DOC = "The maximum time to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active. Probing rebalances " +
- "will continue to be triggered until the assignment is balanced according to the " + BALANCE_FACTOR_CONFIG + ". Must be at least 1 minute.";
+ "will continue to be triggered until the assignment is balanced. Must be at least 1 minute.";
/** {@code processing.guarantee} */
@SuppressWarnings("WeakerAccess")
@@ -681,12 +677,6 @@ public class StreamsConfig extends AbstractConfig {
"",
Importance.LOW,
APPLICATION_SERVER_DOC)
- .define(BALANCE_FACTOR_CONFIG,
- Type.INT,
- 1,
- atLeast(1),
- Importance.LOW,
- BALANCE_FACTOR_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
1000,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 666da21..242cff7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1572,10 +1572,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
return assignmentConfigs.acceptableRecoveryLag;
}
- int balanceFactor() {
- return assignmentConfigs.balanceFactor;
- }
-
int maxWarmupReplicas() {
return assignmentConfigs.maxWarmupReplicas;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 2a5d1d4..8ea63fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -342,7 +342,6 @@ public final class AssignorConfiguration {
public static class AssignmentConfigs {
public final long acceptableRecoveryLag;
- public final int balanceFactor;
public final int maxWarmupReplicas;
public final int numStandbyReplicas;
public final long probingRebalanceIntervalMs;
@@ -350,7 +349,6 @@ public final class AssignorConfiguration {
private AssignmentConfigs(final StreamsConfig configs) {
this(
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
- configs.getInt(StreamsConfig.BALANCE_FACTOR_CONFIG),
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG)
@@ -358,12 +356,10 @@ public final class AssignorConfiguration {
}
AssignmentConfigs(final Long acceptableRecoveryLag,
- final Integer balanceFactor,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
final Long probingRebalanceIntervalMs) {
this.acceptableRecoveryLag = acceptableRecoveryLag;
- this.balanceFactor = balanceFactor;
this.maxWarmupReplicas = maxWarmupReplicas;
this.numStandbyReplicas = numStandbyReplicas;
this.probingRebalanceIntervalMs = probingRebalanceIntervalMs;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
index 5c2136a..70c9070 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
@@ -26,6 +26,5 @@ public interface BalancedAssignor {
Map<UUID, List<TaskId>> assign(final SortedSet<UUID> clients,
final SortedSet<TaskId> tasks,
- final Map<UUID, Integer> clientsToNumberOfStreamThreads,
- final int balanceFactor);
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
index 60d96c6..c75c831 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
@@ -31,12 +31,11 @@ public class DefaultBalancedAssignor implements BalancedAssignor {
@Override
public Map<UUID, List<TaskId>> assign(final SortedSet<UUID> clients,
final SortedSet<TaskId> tasks,
- final Map<UUID, Integer> clientsToNumberOfStreamThreads,
- final int balanceFactor) {
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads) {
final Map<UUID, List<TaskId>> assignment = new HashMap<>();
clients.forEach(client -> assignment.put(client, new ArrayList<>()));
distributeTasksEvenlyOverClients(assignment, clients, tasks);
- balanceTasksOverStreamThreads(assignment, clients, clientsToNumberOfStreamThreads, balanceFactor);
+ balanceTasksOverStreamThreads(assignment, clients, clientsToNumberOfStreamThreads);
return assignment;
}
@@ -58,8 +57,7 @@ public class DefaultBalancedAssignor implements BalancedAssignor {
private void balanceTasksOverStreamThreads(final Map<UUID, List<TaskId>> assignment,
final SortedSet<UUID> clients,
- final Map<UUID, Integer> clientsToNumberOfStreamThreads,
- final int balanceFactor) {
+ final Map<UUID, Integer> clientsToNumberOfStreamThreads) {
boolean stop = false;
while (!stop) {
stop = true;
@@ -74,7 +72,7 @@ public class DefaultBalancedAssignor implements BalancedAssignor {
destinationTasks.size() / clientsToNumberOfStreamThreads.get(destinationClient);
final int assignedTasksPerStreamThreadAtSource =
sourceTasks.size() / clientsToNumberOfStreamThreads.get(sourceClient);
- if (assignedTasksPerStreamThreadAtSource - assignedTasksPerStreamThreadAtDestination > balanceFactor) {
+ if (assignedTasksPerStreamThreadAtSource - assignedTasksPerStreamThreadAtDestination > 1) {
final Iterator<TaskId> sourceIterator = sourceTasks.iterator();
final TaskId taskToMove = sourceIterator.next();
sourceIterator.remove();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index 5dbf099..404b793 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -99,8 +99,7 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
final Map<UUID, List<TaskId>> statefulActiveTaskAssignment = new DefaultBalancedAssignor().assign(
sortedClients,
statefulTasks,
- clientsToNumberOfThreads,
- configs.balanceFactor
+ clientsToNumberOfThreads
);
return assignTaskMovements(
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a6701b0..0b4e614 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -939,18 +939,6 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSetDefaultBalanceFactor() {
- final StreamsConfig config = new StreamsConfig(props);
- assertThat(config.getInt(StreamsConfig.BALANCE_FACTOR_CONFIG), is(1));
- }
-
- @Test
- public void shouldThrowConfigExceptionIfBalanceFactorIsOutsideBounds() {
- props.put(StreamsConfig.BALANCE_FACTOR_CONFIG, 0);
- assertThrows(ConfigException.class, () -> new StreamsConfig(props));
- }
-
- @Test
public void shouldSetDefaultNumStandbyReplicas() {
final StreamsConfig config = new StreamsConfig(props);
assertThat(config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), is(0));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 814bbcf..840d48e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -1743,7 +1743,6 @@ public class StreamsPartitionAssignorTest {
final Map<String, Object> props = configProps();
props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 11);
- props.put(StreamsConfig.BALANCE_FACTOR_CONFIG, 22);
props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 33);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 44);
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 55 * 60 * 1000L);
@@ -1751,7 +1750,6 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.configure(props);
assertThat(partitionAssignor.acceptableRecoveryLag(), equalTo(11L));
- assertThat(partitionAssignor.balanceFactor(), equalTo(22));
assertThat(partitionAssignor.maxWarmupReplicas(), equalTo(33));
assertThat(partitionAssignor.numStandbyReplicas(), equalTo(44));
assertThat(partitionAssignor.probingRebalanceIntervalMs(), equalTo(55 * 60 * 1000L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
index b45a33f..9a3c661 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
@@ -51,8 +51,6 @@ public class DefaultBalancedAssignorTest {
@Test
public void shouldAssignTasksEvenlyOverClientsWhereNumberOfClientsIntegralDivisorOfNumberOfTasks() {
- final int balanceFactor = 1;
-
final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
@@ -66,8 +64,7 @@ public class DefaultBalancedAssignorTest {
TASK_2_1,
TASK_2_2
),
- threeClientsToNumberOfStreamThreads(1, 1, 1),
- balanceFactor
+ threeClientsToNumberOfStreamThreads(1, 1, 1)
);
final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0, TASK_2_0);
@@ -81,8 +78,6 @@ public class DefaultBalancedAssignorTest {
@Test
public void shouldAssignTasksEvenlyOverClientsWhereNumberOfClientsNotIntegralDivisorOfNumberOfTasks() {
- final int balanceFactor = 1;
-
final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
TWO_CLIENTS,
mkSortedSet(
@@ -96,8 +91,7 @@ public class DefaultBalancedAssignorTest {
TASK_2_1,
TASK_2_2
),
- twoClientsToNumberOfStreamThreads(1, 1),
- balanceFactor
+ twoClientsToNumberOfStreamThreads(1, 1)
);
final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2, TASK_1_1, TASK_2_0, TASK_2_2);
@@ -110,8 +104,6 @@ public class DefaultBalancedAssignorTest {
@Test
public void shouldAssignTasksEvenlyOverClientsWhereNumberOfStreamThreadsIntegralDivisorOfNumberOfTasks() {
- final int balanceFactor = 1;
-
final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
@@ -125,8 +117,7 @@ public class DefaultBalancedAssignorTest {
TASK_2_1,
TASK_2_2
),
- threeClientsToNumberOfStreamThreads(3, 3, 3),
- balanceFactor
+ threeClientsToNumberOfStreamThreads(3, 3, 3)
);
final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0, TASK_2_0);
@@ -140,8 +131,6 @@ public class DefaultBalancedAssignorTest {
@Test
public void shouldAssignTasksEvenlyOverClientsWhereNumberOfStreamThreadsNotIntegralDivisorOfNumberOfTasks() {
- final int balanceFactor = 1;
-
final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
@@ -155,8 +144,7 @@ public class DefaultBalancedAssignorTest {
TASK_2_1,
TASK_2_2
),
- threeClientsToNumberOfStreamThreads(2, 2, 2),
- balanceFactor
+ threeClientsToNumberOfStreamThreads(2, 2, 2)
);
final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0, TASK_2_0);
@@ -170,8 +158,6 @@ public class DefaultBalancedAssignorTest {
@Test
public void shouldAssignTasksEvenlyOverUnevenlyDistributedStreamThreads() {
- final int balanceFactor = 1;
-
final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
@@ -185,8 +171,7 @@ public class DefaultBalancedAssignorTest {
TASK_2_1,
TASK_2_2
),
- threeClientsToNumberOfStreamThreads(1, 2, 3),
- balanceFactor
+ threeClientsToNumberOfStreamThreads(1, 2, 3)
);
final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_1_0, TASK_2_0);
@@ -200,16 +185,13 @@ public class DefaultBalancedAssignorTest {
@Test
public void shouldAssignTasksEvenlyOverClientsWithLessClientsThanTasks() {
- final int balanceFactor = 1;
-
final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
TASK_0_0,
TASK_0_1
),
- threeClientsToNumberOfStreamThreads(1, 1, 1),
- balanceFactor
+ threeClientsToNumberOfStreamThreads(1, 1, 1)
);
final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_0);
@@ -223,8 +205,6 @@ public class DefaultBalancedAssignorTest {
@Test
public void shouldAssignTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks() {
- final int balanceFactor = 1;
-
final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
THREE_CLIENTS,
mkSortedSet(
@@ -238,8 +218,7 @@ public class DefaultBalancedAssignorTest {
TASK_2_1,
TASK_2_2
),
- threeClientsToNumberOfStreamThreads(6, 6, 6),
- balanceFactor
+ threeClientsToNumberOfStreamThreads(6, 6, 6)
);
final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0, TASK_2_0);
@@ -253,8 +232,6 @@ public class DefaultBalancedAssignorTest {
@Test
public void shouldAssignTasksEvenlyOverStreamThreadsButBestEffortOverClients() {
- final int balanceFactor = 1;
-
final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
TWO_CLIENTS,
mkSortedSet(
@@ -268,8 +245,7 @@ public class DefaultBalancedAssignorTest {
TASK_2_1,
TASK_2_2
),
- twoClientsToNumberOfStreamThreads(6, 2),
- balanceFactor
+ twoClientsToNumberOfStreamThreads(6, 2)
);
final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2, TASK_1_1, TASK_2_0, TASK_2_2,
@@ -281,35 +257,6 @@ public class DefaultBalancedAssignorTest {
);
}
- @Test
- public void shouldAssignTasksEvenlyOverClientsButNotOverStreamThreadsBecauseBalanceFactorSatisfied() {
- final int balanceFactor = 2;
-
- final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
- TWO_CLIENTS,
- mkSortedSet(
- TASK_0_0,
- TASK_0_1,
- TASK_0_2,
- TASK_1_0,
- TASK_1_1,
- TASK_1_2,
- TASK_2_0,
- TASK_2_1,
- TASK_2_2
- ),
- twoClientsToNumberOfStreamThreads(6, 2),
- balanceFactor
- );
-
- final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2, TASK_1_1, TASK_2_0, TASK_2_2);
- final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_0, TASK_1_2, TASK_2_1);
- assertThat(
- assignment,
- is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2))
- );
- }
-
private static Map<UUID, Integer> twoClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
final int numberOfStreamThread2) {
return mkMap(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
index 687e5b6..9cd2da1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
@@ -53,7 +53,7 @@ public class FallbackPriorTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, 0, 0L)
+ new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, 0L)
);
assertThat(probingRebalanceNeeded, is(true));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index d3fcbbe..dd9b79b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -61,7 +61,6 @@ import static org.hamcrest.Matchers.not;
public class HighAvailabilityTaskAssignorTest {
private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs(
/*acceptableRecoveryLag*/ 100L,
- /*balanceFactor*/ 1,
/*maxWarmupReplicas*/ 2,
/*numStandbyReplicas*/ 0,
/*probingRebalanceIntervalMs*/ 60 * 1000L
@@ -69,13 +68,11 @@ public class HighAvailabilityTaskAssignorTest {
private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
/*acceptableRecoveryLag*/ 100L,
- /*balanceFactor*/ 1,
/*maxWarmupReplicas*/ 2,
/*numStandbyReplicas*/ 1,
/*probingRebalanceIntervalMs*/ 60 * 1000L
);
-
@Test
public void shouldComputeNewAssignmentIfThereAreUnassignedActiveTasks() {
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1);
@@ -286,7 +283,6 @@ public class HighAvailabilityTaskAssignorTest {
statefulTasks,
new AssignmentConfigs(
/*acceptableRecoveryLag*/ 100L,
- /*balanceFactor*/ 1,
/*maxWarmupReplicas*/ 1,
/*numStandbyReplicas*/ 0,
/*probingRebalanceIntervalMs*/ 60 * 1000L
@@ -315,7 +311,6 @@ public class HighAvailabilityTaskAssignorTest {
statefulTasks,
new AssignmentConfigs(
/*acceptableRecoveryLag*/ 100L,
- /*balanceFactor*/ 1,
/*maxWarmupReplicas*/ 1,
/*numStandbyReplicas*/ 1,
/*probingRebalanceIntervalMs*/ 60 * 1000L
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 5203832..85c3947 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -668,7 +668,7 @@ public class StickyTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, 0, 0L)
+ new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, 0L)
);
assertThat(probingRebalanceNeeded, is(false));
@@ -687,7 +687,7 @@ public class StickyTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
- new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, numStandbys, 0L)
+ new AssignorConfiguration.AssignmentConfigs(0L, 0, numStandbys, 0L)
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index 9517400..c85eac7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -232,7 +232,6 @@ public class TaskAssignorConvergenceTest {
@Test
public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
final AssignmentConfigs configs = new AssignmentConfigs(100L,
- 1,
2,
0,
1000L);
@@ -251,7 +250,6 @@ public class TaskAssignorConvergenceTest {
final int numStandbyReplicas = 0;
final AssignmentConfigs configs = new AssignmentConfigs(100L,
- 1,
maxWarmupReplicas,
numStandbyReplicas,
1000L);
@@ -273,7 +271,6 @@ public class TaskAssignorConvergenceTest {
final int numStandbyReplicas = 0;
final AssignmentConfigs configs = new AssignmentConfigs(100L,
- 1,
maxWarmupReplicas,
numStandbyReplicas,
1000L);
@@ -314,7 +311,6 @@ public class TaskAssignorConvergenceTest {
final int numberOfEvents = prng.nextInt(10) + 1;
final AssignmentConfigs configs = new AssignmentConfigs(100L,
- 1,
maxWarmupReplicas,
numStandbyReplicas,
1000L);