You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/05/09 13:44:09 UTC
[kafka] branch trunk updated: KAFKA-13763: Refactor IncrementalCooperativeAssignor for improved unit testing (#11983)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1278e385c0 KAFKA-13763: Refactor IncrementalCooperativeAssignor for improved unit testing (#11983)
1278e385c0 is described below
commit 1278e385c02a44861e9d00b32c0f476d4b149be5
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Mon May 9 09:43:47 2022 -0400
KAFKA-13763: Refactor IncrementalCooperativeAssignor for improved unit testing (#11983)
The goals here include:
1. Create an overloaded variant of the IncrementalCooperativeAssignor::performTaskAssignment method that is more testing friendly
2. Simplify the parameter list for the IncrementalCooperativeAssignor::handleLostAssignments method, which in turn simplifies the logic for testing this class
3. Capture repeated Java 8 streams logic in simple, reusable, easily-verifiable utility methods added to the ConnectUtils class
Reviewers: Luke Chen <sh...@gmail.com>
---
.../runtime/distributed/ConnectAssignor.java | 2 +-
.../IncrementalCooperativeAssignor.java | 280 ++++++++---
.../IncrementalCooperativeConnectProtocol.java | 9 +-
.../runtime/distributed/WorkerCoordinator.java | 23 +
.../apache/kafka/connect/util/ConnectUtils.java | 31 ++
.../ConnectProtocolCompatibilityTest.java | 91 ++--
.../IncrementalCooperativeAssignorTest.java | 528 +++++++++------------
.../WorkerCoordinatorIncrementalTest.java | 4 +-
8 files changed, 515 insertions(+), 453 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
index 752e62e680..1436460d1a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
@@ -32,7 +32,7 @@ public interface ConnectAssignor {
* method computes an assignment of connectors and tasks among the members of the worker group.
*
* @param leaderId the leader of the group
- * @param protocol the protocol type; for Connect assignors this is normally "connect"
+ * @param protocol the protocol type; for Connect assignors this is "eager", "compatible", or "sessioned"
* @param allMemberMetadata the metadata of all the active workers of the group
* @param coordinator the worker coordinator that runs this assignor
* @return the assignment of connectors and tasks to workers
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index e6a8b302b4..8ed80903c5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -16,11 +16,13 @@
*/
package org.apache.kafka.connect.runtime.distributed;
+import java.util.Arrays;
import java.util.Map.Entry;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@@ -43,9 +45,10 @@ import java.util.stream.IntStream;
import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
-import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
+import static org.apache.kafka.connect.util.ConnectUtils.combineCollections;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
/**
* An assignor that computes a distribution of connectors and tasks according to the incremental
@@ -104,18 +107,15 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
maxOffset, coordinator.configSnapshot().offset());
- short protocolVersion = memberConfigs.values().stream()
- .allMatch(state -> state.assignment().version() == CONNECT_PROTOCOL_V2)
- ? CONNECT_PROTOCOL_V2
- : CONNECT_PROTOCOL_V1;
+ short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();
Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
if (leaderOffset == null) {
Map<String, ExtendedAssignment> assignments = fillAssignments(
memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
- leaderId, memberConfigs.get(leaderId).url(), maxOffset, Collections.emptyMap(),
- Collections.emptyMap(), Collections.emptyMap(), 0, protocolVersion);
- return serializeAssignments(assignments);
+ leaderId, memberConfigs.get(leaderId).url(), maxOffset,
+ ClusterAssignment.EMPTY, 0, protocolVersion);
+ return serializeAssignments(assignments, protocolVersion);
}
return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion);
}
@@ -159,11 +159,41 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
WorkerCoordinator coordinator, short protocolVersion) {
log.debug("Performing task assignment during generation: {} with memberId: {}",
coordinator.generationId(), coordinator.memberId());
+ Map<String, ConnectorsAndTasks> memberAssignments = transformValues(
+ memberConfigs,
+ memberConfig -> new ConnectorsAndTasks.Builder()
+ .with(memberConfig.assignment().connectors(), memberConfig.assignment().tasks())
+ .build()
+ );
+ ClusterAssignment clusterAssignment = performTaskAssignment(
+ coordinator.configSnapshot(),
+ coordinator.lastCompletedGenerationId(),
+ coordinator.generationId(),
+ memberAssignments
+ );
+
+ coordinator.leaderState(new LeaderState(memberConfigs, clusterAssignment.allAssignedConnectors(), clusterAssignment.allAssignedTasks()));
+ Map<String, ExtendedAssignment> assignments =
+ fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
+ memberConfigs.get(leaderId).url(), maxOffset,
+ clusterAssignment,
+ delay, protocolVersion);
+
+ log.debug("Actual assignments: {}", assignments);
+ return serializeAssignments(assignments, protocolVersion);
+ }
+
+ // Visible for testing
+ ClusterAssignment performTaskAssignment(
+ ClusterConfigState configSnapshot,
+ int lastCompletedGenerationId,
+ int currentGenerationId,
+ Map<String, ConnectorsAndTasks> memberAssignments
+ ) {
// Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
// can be used to calculate derived sets
log.debug("Previous assignments: {}", previousAssignment);
- int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
if (previousGenerationId != lastCompletedGenerationId) {
log.debug("Clearing the view of previous assignments due to generation mismatch between "
+ "previous generation ID {} and last completed generation ID {}. This can "
@@ -175,11 +205,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
this.previousAssignment = ConnectorsAndTasks.EMPTY;
}
- ClusterConfigState snapshot = coordinator.configSnapshot();
- Set<String> configuredConnectors = new TreeSet<>(snapshot.connectors());
- Set<ConnectorTaskId> configuredTasks = configuredConnectors.stream()
- .flatMap(c -> snapshot.tasks(c).stream())
- .collect(Collectors.toSet());
+ Set<String> configuredConnectors = new TreeSet<>(configSnapshot.connectors());
+ Set<ConnectorTaskId> configuredTasks = combineCollections(configuredConnectors, configSnapshot::tasks, Collectors.toSet());
// Base set: The set of configured connectors-and-tasks is a standalone snapshot that can
// be used to calculate derived sets
@@ -189,7 +216,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
// Base set: The set of active connectors-and-tasks is a standalone snapshot that can be
// used to calculate derived sets
- ConnectorsAndTasks activeAssignments = assignment(memberConfigs);
+ ConnectorsAndTasks activeAssignments = assignment(memberAssignments);
log.debug("Active assignments: {}", activeAssignments);
// This means that a previous revocation did not take effect. In this case, reset
@@ -225,7 +252,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
log.debug("New assignments: {}", newSubmissions);
// A collection of the complete assignment
- List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
+ List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberAssignments, ConnectorsAndTasks.EMPTY);
log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment);
// Per worker connector assignments without removing deleted connectors yet
@@ -239,23 +266,23 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments);
// A collection of the current assignment excluding the connectors-and-tasks to be deleted
- List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberConfigs, deleted);
+ List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberAssignments, deleted);
Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments);
log.debug("Connector and task to delete assignments: {}", toRevoke);
// Revoking redundant connectors/tasks if the workers have duplicate assignments
- toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments));
+ toRevoke.putAll(computeDuplicatedAssignments(memberAssignments, connectorAssignments, taskAssignments));
log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke);
// Recompute the complete assignment excluding the deleted connectors-and-tasks
- completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
+ completeWorkerAssignment = workerAssignment(memberAssignments, deleted);
connectorAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
taskAssignments =
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
- handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs);
+ handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
// Do not revoke resources for re-assignment while a delayed rebalance is active
// Also we do not revoke in two consecutive rebalances by the same leader
@@ -298,20 +325,24 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
diff(taskAssignments, currentTaskAssignments);
+ previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
+ previousGenerationId = currentGenerationId;
+ previousMembers = memberAssignments.keySet();
+
log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
log.debug("Incremental task assignments: {}", incrementalTaskAssignments);
- coordinator.leaderState(new LeaderState(memberConfigs, connectorAssignments, taskAssignments));
-
- Map<String, ExtendedAssignment> assignments =
- fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
- memberConfigs.get(leaderId).url(), maxOffset, incrementalConnectorAssignments,
- incrementalTaskAssignments, toRevoke, delay, protocolVersion);
- previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
- previousGenerationId = coordinator.generationId();
- previousMembers = memberConfigs.keySet();
- log.debug("Actual assignments: {}", assignments);
- return serializeAssignments(assignments);
+ Map<String, Collection<String>> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors);
+ Map<String, Collection<ConnectorTaskId>> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
+ return new ClusterAssignment(
+ incrementalConnectorAssignments,
+ incrementalTaskAssignments,
+ revokedConnectors,
+ revokedTasks,
+ diff(connectorAssignments, revokedConnectors),
+ diff(taskAssignments, revokedTasks)
+ );
}
private Map<String, ConnectorsAndTasks> computeDeleted(ConnectorsAndTasks deleted,
@@ -344,9 +375,9 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
Map<String, Collection<ConnectorTaskId>> taskAssignments,
ConnectorsAndTasks lostAssignments) {
ConnectorsAndTasks previousAssignment = new ConnectorsAndTasks.Builder().with(
- connectorAssignments.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
- taskAssignments.values() .stream() .flatMap(Collection::stream).collect(Collectors.toSet()))
- .build();
+ ConnectUtils.combineCollections(connectorAssignments.values()),
+ ConnectUtils.combineCollections(taskAssignments.values())
+ ).build();
for (ConnectorsAndTasks revoked : toRevoke.values()) {
previousAssignment.connectors().removeAll(revoked.connectors());
@@ -363,29 +394,36 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
return previousAssignment;
}
- private ConnectorsAndTasks duplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs) {
- Set<String> connectors = memberConfigs.entrySet().stream()
- .flatMap(memberConfig -> memberConfig.getValue().assignment().connectors().stream())
- .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
+ private ConnectorsAndTasks duplicatedAssignments(Map<String, ConnectorsAndTasks> memberAssignments) {
+ Map<String, Long> connectorInstanceCounts = combineCollections(
+ memberAssignments.values(),
+ ConnectorsAndTasks::connectors,
+ Collectors.groupingBy(Function.identity(), Collectors.counting())
+ );
+ Set<String> duplicatedConnectors = connectorInstanceCounts
.entrySet().stream()
.filter(entry -> entry.getValue() > 1L)
.map(Entry::getKey)
.collect(Collectors.toSet());
- Set<ConnectorTaskId> tasks = memberConfigs.values().stream()
- .flatMap(state -> state.assignment().tasks().stream())
- .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
+ Map<ConnectorTaskId, Long> taskInstanceCounts = combineCollections(
+ memberAssignments.values(),
+ ConnectorsAndTasks::tasks,
+ Collectors.groupingBy(Function.identity(), Collectors.counting())
+ );
+ Set<ConnectorTaskId> duplicatedTasks = taskInstanceCounts
.entrySet().stream()
.filter(entry -> entry.getValue() > 1L)
.map(Entry::getKey)
.collect(Collectors.toSet());
- return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
+
+ return new ConnectorsAndTasks.Builder().with(duplicatedConnectors, duplicatedTasks).build();
}
- private Map<String, ConnectorsAndTasks> computeDuplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs,
+ private Map<String, ConnectorsAndTasks> computeDuplicatedAssignments(Map<String, ConnectorsAndTasks> memberAssignments,
Map<String, Collection<String>> connectorAssignments,
Map<String, Collection<ConnectorTaskId>> taskAssignment) {
- ConnectorsAndTasks duplicatedAssignments = duplicatedAssignments(memberConfigs);
+ ConnectorsAndTasks duplicatedAssignments = duplicatedAssignments(memberAssignments);
log.debug("Duplicated assignments: {}", duplicatedAssignments);
Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
@@ -421,8 +459,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
// visible for testing
protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
ConnectorsAndTasks newSubmissions,
- List<WorkerLoad> completeWorkerAssignment,
- Map<String, ExtendedWorkerState> memberConfigs) {
+ List<WorkerLoad> completeWorkerAssignment) {
if (lostAssignments.isEmpty()) {
resetDelay();
return;
@@ -432,7 +469,10 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
log.debug("Found the following connectors and tasks missing from previous assignments: "
+ lostAssignments);
- if (scheduledRebalance <= 0 && memberConfigs.keySet().containsAll(previousMembers)) {
+ Set<String> activeMembers = completeWorkerAssignment.stream()
+ .map(WorkerLoad::worker)
+ .collect(Collectors.toSet());
+ if (scheduledRebalance <= 0 && activeMembers.containsAll(previousMembers)) {
log.debug("No worker seems to have departed the group during the rebalance. The "
+ "missing assignments that the leader is detecting are probably due to some "
+ "workers failing to receive the new assignments in the previous rebalance. "
@@ -489,7 +529,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
log.debug("Delayed rebalance in progress. Task reassignment is postponed. New computed rebalance delay: {}", delay);
} else {
// This means scheduledRebalance == 0
- // We could also also extract the current minimum delay from the group, to make
+ // We could also extract the current minimum delay from the group, to make
// independent of consecutive leader failures, but this optimization is skipped
// at the moment
delay = maxDelay;
@@ -526,7 +566,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
}
/**
- * Task revocation is based on an rough estimation of the lower average number of tasks before
+ * Task revocation is based on a rough estimation of the lower average number of tasks before
* and after new workers join the group. If no new workers join, no revocation takes place.
* Based on this estimation, tasks are revoked until the new floor average is reached for
* each existing worker. The revoked tasks, once assigned to the new workers will maintain
@@ -610,16 +650,14 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
private Map<String, ExtendedAssignment> fillAssignments(Collection<String> members, short error,
String leaderId, String leaderUrl, long maxOffset,
- Map<String, Collection<String>> connectorAssignments,
- Map<String, Collection<ConnectorTaskId>> taskAssignments,
- Map<String, ConnectorsAndTasks> revoked,
+ ClusterAssignment clusterAssignment,
int delay, short protocolVersion) {
Map<String, ExtendedAssignment> groupAssignment = new HashMap<>();
for (String member : members) {
- Collection<String> connectorsToStart = connectorAssignments.getOrDefault(member, Collections.emptyList());
- Collection<ConnectorTaskId> tasksToStart = taskAssignments.getOrDefault(member, Collections.emptyList());
- Collection<String> connectorsToStop = revoked.getOrDefault(member, ConnectorsAndTasks.EMPTY).connectors();
- Collection<ConnectorTaskId> tasksToStop = revoked.getOrDefault(member, ConnectorsAndTasks.EMPTY).tasks();
+ Collection<String> connectorsToStart = clusterAssignment.newlyAssignedConnectors(member);
+ Collection<ConnectorTaskId> tasksToStart = clusterAssignment.newlyAssignedTasks(member);
+ Collection<String> connectorsToStop = clusterAssignment.newlyRevokedConnectors(member);
+ Collection<ConnectorTaskId> tasksToStop = clusterAssignment.newlyRevokedTasks(member);
ExtendedAssignment assignment =
new ExtendedAssignment(protocolVersion, error, leaderId, leaderUrl, maxOffset,
connectorsToStart, tasksToStart, connectorsToStop, tasksToStop, delay);
@@ -637,12 +675,13 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
* @param assignments the map of worker assignments
* @return the serialized map of assignments to workers
*/
- protected Map<String, ByteBuffer> serializeAssignments(Map<String, ExtendedAssignment> assignments) {
+ protected Map<String, ByteBuffer> serializeAssignments(Map<String, ExtendedAssignment> assignments, short protocolVersion) {
+ boolean sessioned = protocolVersion >= CONNECT_PROTOCOL_V2;
return assignments.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
- e -> IncrementalCooperativeConnectProtocol.serializeAssignment(e.getValue())));
+ e -> IncrementalCooperativeConnectProtocol.serializeAssignment(e.getValue(), sessioned)));
}
private static ConnectorsAndTasks diff(ConnectorsAndTasks base,
@@ -661,23 +700,18 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
Map<String, Collection<T>> incremental = new HashMap<>();
for (Map.Entry<String, Collection<T>> entry : base.entrySet()) {
List<T> values = new ArrayList<>(entry.getValue());
- values.removeAll(toSubtract.get(entry.getKey()));
+ values.removeAll(toSubtract.getOrDefault(entry.getKey(), Collections.emptySet()));
incremental.put(entry.getKey(), values);
}
return incremental;
}
- private ConnectorsAndTasks assignment(Map<String, ExtendedWorkerState> memberConfigs) {
- log.debug("Received assignments: {}", memberConfigs);
- Set<String> connectors = memberConfigs.values()
- .stream()
- .flatMap(state -> state.assignment().connectors().stream())
- .collect(Collectors.toSet());
- Set<ConnectorTaskId> tasks = memberConfigs.values()
- .stream()
- .flatMap(state -> state.assignment().tasks().stream())
- .collect(Collectors.toSet());
- return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
+ private ConnectorsAndTasks assignment(Map<String, ConnectorsAndTasks> memberAssignments) {
+ log.debug("Received assignments: {}", memberAssignments);
+ return new ConnectorsAndTasks.Builder().with(
+ ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::connectors),
+ ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::tasks)
+ ).build();
}
private int calculateDelay(long now) {
@@ -745,22 +779,120 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
}
}
- private static List<WorkerLoad> workerAssignment(Map<String, ExtendedWorkerState> memberConfigs,
+ private static List<WorkerLoad> workerAssignment(Map<String, ConnectorsAndTasks> memberAssignments,
ConnectorsAndTasks toExclude) {
ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder()
.with(new HashSet<>(toExclude.connectors()), new HashSet<>(toExclude.tasks()))
.build();
- return memberConfigs.entrySet().stream()
+ return memberAssignments.entrySet().stream()
.map(e -> new WorkerLoad.Builder(e.getKey()).with(
- e.getValue().assignment().connectors().stream()
+ e.getValue().connectors().stream()
.filter(v -> !ignore.connectors().contains(v))
.collect(Collectors.toList()),
- e.getValue().assignment().tasks().stream()
+ e.getValue().tasks().stream()
.filter(v -> !ignore.tasks().contains(v))
.collect(Collectors.toList())
).build()
).collect(Collectors.toList());
}
+ static class ClusterAssignment {
+
+ private final Map<String, Collection<String>> newlyAssignedConnectors;
+ private final Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks;
+ private final Map<String, Collection<String>> newlyRevokedConnectors;
+ private final Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks;
+ private final Map<String, Collection<String>> allAssignedConnectors;
+ private final Map<String, Collection<ConnectorTaskId>> allAssignedTasks;
+ private final Set<String> allWorkers;
+
+ public static final ClusterAssignment EMPTY = new ClusterAssignment(
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap()
+ );
+
+ public ClusterAssignment(
+ Map<String, Collection<String>> newlyAssignedConnectors,
+ Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks,
+ Map<String, Collection<String>> newlyRevokedConnectors,
+ Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks,
+ Map<String, Collection<String>> allAssignedConnectors,
+ Map<String, Collection<ConnectorTaskId>> allAssignedTasks
+ ) {
+ this.newlyAssignedConnectors = newlyAssignedConnectors;
+ this.newlyAssignedTasks = newlyAssignedTasks;
+ this.newlyRevokedConnectors = newlyRevokedConnectors;
+ this.newlyRevokedTasks = newlyRevokedTasks;
+ this.allAssignedConnectors = allAssignedConnectors;
+ this.allAssignedTasks = allAssignedTasks;
+ this.allWorkers = combineCollections(
+ Arrays.asList(newlyAssignedConnectors, newlyAssignedTasks, newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors, allAssignedTasks),
+ Map::keySet,
+ Collectors.toSet()
+ );
+ }
+
+ public Map<String, Collection<String>> newlyAssignedConnectors() {
+ return newlyAssignedConnectors;
+ }
+
+ public Collection<String> newlyAssignedConnectors(String worker) {
+ return newlyAssignedConnectors.getOrDefault(worker, Collections.emptySet());
+ }
+
+ public Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks() {
+ return newlyAssignedTasks;
+ }
+
+ public Collection<ConnectorTaskId> newlyAssignedTasks(String worker) {
+ return newlyAssignedTasks.getOrDefault(worker, Collections.emptySet());
+ }
+
+ public Map<String, Collection<String>> newlyRevokedConnectors() {
+ return newlyRevokedConnectors;
+ }
+
+ public Collection<String> newlyRevokedConnectors(String worker) {
+ return newlyRevokedConnectors.getOrDefault(worker, Collections.emptySet());
+ }
+
+ public Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks() {
+ return newlyRevokedTasks;
+ }
+
+ public Collection<ConnectorTaskId> newlyRevokedTasks(String worker) {
+ return newlyRevokedTasks.getOrDefault(worker, Collections.emptySet());
+ }
+
+ public Map<String, Collection<String>> allAssignedConnectors() {
+ return allAssignedConnectors;
+ }
+
+ public Map<String, Collection<ConnectorTaskId>> allAssignedTasks() {
+ return allAssignedTasks;
+ }
+
+ public Set<String> allWorkers() {
+ return allWorkers;
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterAssignment{"
+ + "newlyAssignedConnectors=" + newlyAssignedConnectors
+ + ", newlyAssignedTasks=" + newlyAssignedTasks
+ + ", newlyRevokedConnectors=" + newlyRevokedConnectors
+ + ", newlyRevokedTasks=" + newlyRevokedTasks
+ + ", allAssignedConnectors=" + allAssignedConnectors
+ + ", allAssignedTasks=" + allAssignedTasks
+ + ", allWorkers=" + allWorkers
+ + '}';
+ }
+ }
+
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java
index 6bcf9be65e..c32009c794 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java
@@ -154,7 +154,7 @@ public class IncrementalCooperativeConnectProtocol {
.set(CONFIG_OFFSET_KEY_NAME, workerState.offset());
// Not a big issue if we embed the protocol version with the assignment in the metadata
Struct allocation = new Struct(ALLOCATION_V1)
- .set(ALLOCATION_KEY_NAME, serializeAssignment(workerState.assignment()));
+ .set(ALLOCATION_KEY_NAME, serializeAssignment(workerState.assignment(), sessioned));
Struct connectProtocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : CONNECT_PROTOCOL_HEADER_V1;
ByteBuffer buffer = ByteBuffer.allocate(connectProtocolHeader.sizeOf()
+ CONFIG_STATE_V1.sizeOf(configState)
@@ -230,15 +230,16 @@ public class IncrementalCooperativeConnectProtocol {
* ScheduledDelay => Int32
* </pre>
*/
- public static ByteBuffer serializeAssignment(ExtendedAssignment assignment) {
+ public static ByteBuffer serializeAssignment(ExtendedAssignment assignment, boolean sessioned) {
// comparison depends on reference equality for now
if (assignment == null || ExtendedAssignment.empty().equals(assignment)) {
return null;
}
Struct struct = assignment.toStruct();
- ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf()
+ Struct protocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : CONNECT_PROTOCOL_HEADER_V1;
+ ByteBuffer buffer = ByteBuffer.allocate(protocolHeader.sizeOf()
+ ASSIGNMENT_V1.sizeOf(struct));
- CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
+ protocolHeader.writeTo(buffer);
ASSIGNMENT_V1.write(buffer, struct);
buffer.flip();
return buffer;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 65720e2a78..77fb16eaa3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -416,6 +416,29 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
return allMembers.get(ownerId).url();
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof LeaderState)) return false;
+ LeaderState that = (LeaderState) o;
+ return Objects.equals(allMembers, that.allMembers)
+ && Objects.equals(connectorOwners, that.connectorOwners)
+ && Objects.equals(taskOwners, that.taskOwners);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(allMembers, connectorOwners, taskOwners);
+ }
+
+ @Override
+ public String toString() {
+ return "LeaderState{"
+ + "allMembers=" + allMembers
+ + ", connectorOwners=" + connectorOwners
+ + ", taskOwners=" + taskOwners
+ + '}';
+ }
}
public static class ConnectorsAndTasks {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 7adbd8f92d..bbf2477603 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -30,10 +30,15 @@ import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
public final class ConnectUtils {
private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
@@ -160,4 +165,30 @@ public final class ConnectUtils {
return SourceConnector.class.isAssignableFrom(connector.getClass());
}
+ public static <K, I, O> Map<K, O> transformValues(Map<K, I> map, Function<I, O> transformation) {
+ return map.entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ transformation.compose(Map.Entry::getValue)
+ ));
+ }
+
+ public static <I> List<I> combineCollections(Collection<Collection<I>> collections) {
+ return combineCollections(collections, Function.identity());
+ }
+
+ public static <I, T> List<T> combineCollections(Collection<I> collection, Function<I, Collection<T>> extractCollection) {
+ return combineCollections(collection, extractCollection, Collectors.toList());
+ }
+
+ public static <I, T, C> C combineCollections(
+ Collection<I> collection,
+ Function<I, Collection<T>> extractCollection,
+ Collector<T, ?, C> collector
+ ) {
+ return collection.stream()
+ .map(extractCollection)
+ .flatMap(Collection::stream)
+ .collect(collector);
+ }
+
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java
index 883574957d..fdb4c542f7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java
@@ -16,32 +16,22 @@
*/
package org.apache.kafka.connect.runtime.distributed;
-import org.apache.kafka.connect.runtime.TargetState;
-import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
public class ConnectProtocolCompatibilityTest {
+ private static final String LEADER = "leader";
private static final String LEADER_URL = "leaderUrl:8083";
+ private static final long CONFIG_OFFSET = 1;
private String connectorId1 = "connector1";
private String connectorId2 = "connector2";
@@ -51,95 +41,58 @@ public class ConnectProtocolCompatibilityTest {
private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
- @Rule
- public MockitoRule rule = MockitoJUnit.rule();
-
- @Mock
- private KafkaConfigBackingStore configStorage;
- private ClusterConfigState configState;
-
- @Before
- public void setup() {
- configStorage = mock(KafkaConfigBackingStore.class);
- configState = new ClusterConfigState(
- 1L,
- null,
- Collections.singletonMap(connectorId1, 1),
- Collections.singletonMap(connectorId1, new HashMap<>()),
- Collections.singletonMap(connectorId1, TargetState.STARTED),
- Collections.singletonMap(taskId1x0, new HashMap<>()),
- Collections.emptySet());
- }
-
- @After
- public void teardown() {
- verifyNoMoreInteractions(configStorage);
- }
-
@Test
public void testEagerToEagerMetadata() {
- when(configStorage.snapshot()).thenReturn(configState);
- ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+ ConnectProtocol.WorkerState workerState = emptyWorkerState();
ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(metadata);
assertEquals(LEADER_URL, state.url());
assertEquals(1, state.offset());
- verify(configStorage).snapshot();
}
@Test
public void testCoopToCoopMetadata() {
- when(configStorage.snapshot()).thenReturn(configState);
- ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+ ExtendedWorkerState workerState = emptyExtendedWorkerState(CONNECT_PROTOCOL_V1);
ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState, false);
ExtendedWorkerState state = IncrementalCooperativeConnectProtocol.deserializeMetadata(metadata);
assertEquals(LEADER_URL, state.url());
assertEquals(1, state.offset());
- verify(configStorage).snapshot();
}
@Test
public void testSessionedToCoopMetadata() {
- when(configStorage.snapshot()).thenReturn(configState);
- ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+ ExtendedWorkerState workerState = emptyExtendedWorkerState(CONNECT_PROTOCOL_V2);
ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState, true);
ExtendedWorkerState state = IncrementalCooperativeConnectProtocol.deserializeMetadata(metadata);
assertEquals(LEADER_URL, state.url());
assertEquals(1, state.offset());
- verify(configStorage).snapshot();
}
@Test
public void testSessionedToEagerMetadata() {
- when(configStorage.snapshot()).thenReturn(configState);
- ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+ ExtendedWorkerState workerState = emptyExtendedWorkerState(CONNECT_PROTOCOL_V2);
ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState, true);
ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(metadata);
assertEquals(LEADER_URL, state.url());
assertEquals(1, state.offset());
- verify(configStorage).snapshot();
}
@Test
public void testCoopToEagerMetadata() {
- when(configStorage.snapshot()).thenReturn(configState);
- ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+ ExtendedWorkerState workerState = emptyExtendedWorkerState(CONNECT_PROTOCOL_V1);
ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState, false);
ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(metadata);
assertEquals(LEADER_URL, state.url());
assertEquals(1, state.offset());
- verify(configStorage).snapshot();
}
@Test
public void testEagerToCoopMetadata() {
- when(configStorage.snapshot()).thenReturn(configState);
- ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(LEADER_URL, configStorage.snapshot().offset());
+ ConnectProtocol.WorkerState workerState = emptyWorkerState();
ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
ConnectProtocol.WorkerState state = IncrementalCooperativeConnectProtocol.deserializeMetadata(metadata);
assertEquals(LEADER_URL, state.url());
assertEquals(1, state.offset());
- verify(configStorage).snapshot();
}
@Test
@@ -176,7 +129,7 @@ public class ConnectProtocolCompatibilityTest {
Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0),
Collections.emptyList(), Collections.emptyList(), 0);
- ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+ ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf);
assertFalse(leaderAssignment.failed());
assertEquals("leader", leaderAssignment.leader());
@@ -235,7 +188,7 @@ public class ConnectProtocolCompatibilityTest {
Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0),
Collections.emptyList(), Collections.emptyList(), 0);
- ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+ ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf);
assertFalse(leaderAssignment.failed());
assertEquals("leader", leaderAssignment.leader());
@@ -248,7 +201,7 @@ public class ConnectProtocolCompatibilityTest {
Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0),
Collections.emptyList(), Collections.emptyList(), 0);
- ByteBuffer memberBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment2);
+ ByteBuffer memberBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment2, false);
ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(memberBuf);
assertFalse(memberAssignment.failed());
assertEquals("member", memberAssignment.leader());
@@ -257,4 +210,24 @@ public class ConnectProtocolCompatibilityTest {
assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks());
}
+ private ConnectProtocol.WorkerState emptyWorkerState() {
+ return new ConnectProtocol.WorkerState(LEADER_URL, CONFIG_OFFSET);
+ }
+
+ private ExtendedWorkerState emptyExtendedWorkerState(short protocolVersion) {
+ ExtendedAssignment assignment = new ExtendedAssignment(
+ protocolVersion,
+ ConnectProtocol.Assignment.NO_ERROR,
+ LEADER,
+ LEADER_URL,
+ CONFIG_OFFSET,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ 0
+ );
+ return new ExtendedWorkerState(LEADER_URL, CONFIG_OFFSET, assignment);
+ }
+
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 0ad7f63477..bd51456b72 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -17,25 +17,18 @@
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -44,93 +37,57 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
-import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
-import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
-import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment;
import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-@RunWith(Parameterized.class)
public class IncrementalCooperativeAssignorTest {
- @Rule
- public MockitoRule rule = MockitoJUnit.rule();
- @Mock
- private WorkerCoordinator coordinator;
-
- @Captor
- ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture;
-
- @Parameters
- public static Iterable<?> mode() {
- return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);
- }
-
- @Parameter
- public short protocolVersion;
+ // Offset isn't used in most tests but is required for creating a config snapshot object,
+ // so just use some arbitrary constant for that
+ private static final long CONFIG_OFFSET = 618;
private Map<String, Integer> connectors;
- private Map<String, ExtendedWorkerState> memberConfigs;
- private long offset;
- private String leader;
- private String leaderUrl;
private Time time;
private int rebalanceDelay;
private IncrementalCooperativeAssignor assignor;
- private int rebalanceNum;
- Map<String, ExtendedAssignment> returnedAssignments;
+ private int generationId;
+ private ClusterAssignment returnedAssignments;
+ private Map<String, ConnectorsAndTasks> memberAssignments;
@Before
public void setup() {
- leader = "worker1";
- leaderUrl = expectedLeaderUrl(leader);
- offset = 10;
+ generationId = 1000;
+ time = Time.SYSTEM;
+ rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
connectors = new HashMap<>();
addNewConnector("connector1", 4);
addNewConnector("connector2", 4);
- memberConfigs = memberConfigs(leader, offset, "worker1");
- time = Time.SYSTEM;
- rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
+ memberAssignments = new HashMap<>();
+ addNewEmptyWorkers("worker1");
initAssignor();
}
- @After
- public void teardown() {
- verifyNoMoreInteractions(coordinator);
- }
-
public void initAssignor() {
- assignor = Mockito.spy(new IncrementalCooperativeAssignor(
- new LogContext(),
- time,
- rebalanceDelay));
- assignor.previousGenerationId = 1000;
+ assignor = new IncrementalCooperativeAssignor(new LogContext(), time, rebalanceDelay);
+ assignor.previousGenerationId = generationId;
}
@Test
public void testTaskAssignmentWhenWorkerJoins() {
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 1 worker and 2 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
@@ -158,8 +115,6 @@ public class IncrementalCooperativeAssignorTest {
performStandardRebalance();
assertDelay(0);
assertEmptyAssignment();
-
- verifyCoordinatorInteractions();
}
@Test
@@ -168,9 +123,6 @@ public class IncrementalCooperativeAssignorTest {
time = new MockTime();
initAssignor();
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performStandardRebalance();
@@ -205,8 +157,6 @@ public class IncrementalCooperativeAssignorTest {
assertConnectorAllocations(2);
assertTaskAllocations(8);
assertBalancedAndCompleteAllocation();
-
- verifyCoordinatorInteractions();
}
@Test
@@ -215,9 +165,6 @@ public class IncrementalCooperativeAssignorTest {
time = new MockTime();
initAssignor();
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performStandardRebalance();
@@ -263,8 +210,6 @@ public class IncrementalCooperativeAssignorTest {
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();
-
- verifyCoordinatorInteractions();
}
@Test
@@ -273,9 +218,6 @@ public class IncrementalCooperativeAssignorTest {
time = new MockTime();
initAssignor();
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 3 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2", "worker3");
performStandardRebalance();
@@ -289,13 +231,10 @@ public class IncrementalCooperativeAssignorTest {
// group was the leader. The new leader has no previous assignments and is not tracking a
// delay upon a leader's exit
removeWorkers("worker1");
- leader = "worker2";
- leaderUrl = expectedLeaderUrl(leader);
// The fact that the leader bounces means that the assignor starts from a clean slate
initAssignor();
// Capture needs to be reset to point to the new assignor
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
performStandardRebalance();
assertDelay(0);
assertWorkers("worker2", "worker3");
@@ -307,8 +246,6 @@ public class IncrementalCooperativeAssignorTest {
performStandardRebalance();
assertDelay(0);
assertEmptyAssignment();
-
- verifyCoordinatorInteractions();
}
@Test
@@ -317,9 +254,6 @@ public class IncrementalCooperativeAssignorTest {
time = new MockTime();
initAssignor();
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 3 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2", "worker3");
performStandardRebalance();
@@ -333,13 +267,10 @@ public class IncrementalCooperativeAssignorTest {
// group was the leader. The new leader has no previous assignments and is not tracking a
// delay upon a leader's exit
removeWorkers("worker1");
- leader = "worker2";
- leaderUrl = expectedLeaderUrl(leader);
// The fact that the leader bounces means that the assignor starts from a clean slate
initAssignor();
// Capture needs to be reset to point to the new assignor
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
performStandardRebalance();
assertDelay(0);
assertWorkers("worker2", "worker3");
@@ -363,8 +294,6 @@ public class IncrementalCooperativeAssignorTest {
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(2, 3, 3);
assertBalancedAndCompleteAllocation();
-
- verifyCoordinatorInteractions();
}
@Test
@@ -373,10 +302,6 @@ public class IncrementalCooperativeAssignorTest {
time = new MockTime();
initAssignor();
- updateConfigSnapshot();
- doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
- .when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performFailedRebalance();
@@ -389,14 +314,11 @@ public class IncrementalCooperativeAssignorTest {
// Second assignment happens with members returning the same assignments (memberConfigs)
// as the first time. The assignor detects that the number of members did not change and
// avoids the rebalance delay, treating the lost assignments as new assignments.
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
performStandardRebalance();
assertDelay(0);
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();
-
- verifyCoordinatorInteractions();
}
@Test
@@ -405,9 +327,6 @@ public class IncrementalCooperativeAssignorTest {
time = new MockTime();
initAssignor();
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performStandardRebalance();
@@ -417,10 +336,6 @@ public class IncrementalCooperativeAssignorTest {
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();
- updateConfigSnapshot();
- doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
- .when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// Second assignment triggered by a third worker joining. The computed assignment should
// revoke tasks from the existing group. But the assignment won't be correctly delivered.
addNewEmptyWorkers("worker3");
@@ -433,7 +348,6 @@ public class IncrementalCooperativeAssignorTest {
// Third assignment happens with members returning the same assignments (memberConfigs)
// as the first time.
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
performStandardRebalance();
assertDelay(0);
assertConnectorAllocations(0, 1, 1);
@@ -445,8 +359,6 @@ public class IncrementalCooperativeAssignorTest {
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(2, 3, 3);
assertBalancedAndCompleteAllocation();
-
- verifyCoordinatorInteractions();
}
@Test
@@ -455,9 +367,6 @@ public class IncrementalCooperativeAssignorTest {
time = new MockTime();
initAssignor();
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performStandardRebalance();
@@ -480,7 +389,6 @@ public class IncrementalCooperativeAssignorTest {
// Third assignment happens with members returning the same assignments (memberConfigs)
// as the first time.
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
performRebalanceWithMismatchedGeneration();
assertDelay(0);
assertConnectorAllocations(0, 1, 1);
@@ -492,15 +400,11 @@ public class IncrementalCooperativeAssignorTest {
assertConnectorAllocations(0, 1, 1);
assertTaskAllocations(2, 3, 3);
assertBalancedAndCompleteAllocation();
-
- verifyCoordinatorInteractions();
}
@Test
public void testTaskAssignmentWhenConnectorsAreDeleted() {
addNewConnector("connector3", 4);
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
// First assignment with 1 worker and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
@@ -513,15 +417,11 @@ public class IncrementalCooperativeAssignorTest {
// Second assignment with an updated config state that reflects removal of a connector
removeConnector("connector3");
- offset++;
- updateConfigSnapshot();
performStandardRebalance();
assertDelay(0);
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();
-
- verifyCoordinatorInteractions();
}
@Test
@@ -589,7 +489,6 @@ public class IncrementalCooperativeAssignorTest {
@Test
public void testLostAssignmentHandlingWhenWorkerBounces() {
- // Customize assignor for this test case
time = new MockTime();
initAssignor();
@@ -601,15 +500,13 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
- memberConfigs = memberConfigs(leader, offset, "worker0", "worker1", "worker2");
ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
newSubmissions,
- new ArrayList<>(configuredAssignment.values()),
- memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
@@ -617,17 +514,16 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
- String flakyWorker = "worker1";
- WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
- removeWorkers(flakyWorker);
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
+ String flakyWorker = "worker1";
+ WorkerLoad lostLoad = configuredAssignment.remove(flakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
// Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
@@ -635,15 +531,14 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
assertEquals(rebalanceDelay, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
time.sleep(rebalanceDelay / 2);
rebalanceDelay /= 2;
// A new worker (probably returning worker) has joined
configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
- addNewEmptyWorkers(flakyWorker);
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.singleton(flakyWorker),
@@ -651,12 +546,12 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
assertEquals(rebalanceDelay, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
time.sleep(rebalanceDelay);
// The new worker has still no assignments
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertTrue("Wrong assignment of lost connectors",
configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
@@ -687,15 +582,13 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
- memberConfigs = memberConfigs(leader, offset, "worker0", "worker1", "worker2");
ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
newSubmissions,
- new ArrayList<>(configuredAssignment.values()),
- memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
@@ -703,17 +596,16 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
- String removedWorker = "worker1";
- WorkerLoad lostLoad = workerLoad(removedWorker, 2, 2, 4, 4);
- removeWorkers(removedWorker);
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
+ String removedWorker = "worker1";
+ WorkerLoad lostLoad = configuredAssignment.remove(removedWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
// Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
@@ -721,13 +613,13 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
assertEquals(rebalanceDelay, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+ assignor.previousMembers = new HashSet<>(memberAssignments.keySet());
time.sleep(rebalanceDelay / 2);
rebalanceDelay /= 2;
// No new worker has joined
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
@@ -738,7 +630,7 @@ public class IncrementalCooperativeAssignorTest {
time.sleep(rebalanceDelay);
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertTrue("Wrong assignment of lost connectors",
newSubmissions.connectors().containsAll(lostAssignments.connectors()));
@@ -765,15 +657,13 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
- memberConfigs = memberConfigs(leader, offset, "worker0", "worker1", "worker2");
ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
newSubmissions,
- new ArrayList<>(configuredAssignment.values()),
- memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
@@ -781,20 +671,19 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
- String flakyWorker = "worker1";
- WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
- removeWorkers(flakyWorker);
- String newWorker = "worker3";
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
+ String flakyWorker = "worker1";
+ WorkerLoad lostLoad = configuredAssignment.remove(flakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
- // Lost assignments detected - A new worker also has joined that is not the returning worker
+ String newWorker = "worker3";
configuredAssignment.put(newWorker, new WorkerLoad.Builder(newWorker).build());
- addNewEmptyWorkers(newWorker);
+
+ // Lost assignments detected - A new worker also has joined that is not the returning worker
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.singleton(newWorker),
@@ -802,15 +691,14 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
assertEquals(rebalanceDelay, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
time.sleep(rebalanceDelay / 2);
rebalanceDelay /= 2;
// Now two new workers have joined
configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
- addNewEmptyWorkers(flakyWorker);
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
Set<String> expectedWorkers = new HashSet<>();
expectedWorkers.addAll(Arrays.asList(newWorker, flakyWorker));
@@ -820,7 +708,7 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
assertEquals(rebalanceDelay, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
time.sleep(rebalanceDelay);
// The new workers have new assignments, other than the lost ones
@@ -829,7 +717,7 @@ public class IncrementalCooperativeAssignorTest {
// we don't reflect these new assignments in memberConfigs currently because they are not
// used in handleLostAssignments method
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
// both the newWorkers would need to be considered for re assignment of connectors and tasks
List<String> listOfConnectorsInLast2Workers = new ArrayList<>();
@@ -867,15 +755,13 @@ public class IncrementalCooperativeAssignorTest {
configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
- memberConfigs = memberConfigs(leader, offset, "worker0", "worker1", "worker2");
ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
newSubmissions,
- new ArrayList<>(configuredAssignment.values()),
- memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
@@ -883,17 +769,16 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
- String veryFlakyWorker = "worker1";
- WorkerLoad lostLoad = workerLoad(veryFlakyWorker, 2, 2, 4, 4);
- removeWorkers(veryFlakyWorker);
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
+ String veryFlakyWorker = "worker1";
+ WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
// Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
@@ -901,15 +786,14 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
assertEquals(rebalanceDelay, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
time.sleep(rebalanceDelay / 2);
rebalanceDelay /= 2;
// A new worker (probably returning worker) has joined
configuredAssignment.put(veryFlakyWorker, new WorkerLoad.Builder(veryFlakyWorker).build());
- addNewEmptyWorkers(veryFlakyWorker);
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertEquals("Wrong set of workers for reassignments",
Collections.singleton(veryFlakyWorker),
@@ -917,14 +801,13 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
assertEquals(rebalanceDelay, assignor.delay);
- assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+ assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
time.sleep(rebalanceDelay);
// The returning worker leaves permanently after joining briefly during the delay
configuredAssignment.remove(veryFlakyWorker);
- removeWorkers(veryFlakyWorker);
assignor.handleLostAssignments(lostAssignments, newSubmissions,
- new ArrayList<>(configuredAssignment.values()), memberConfigs);
+ new ArrayList<>(configuredAssignment.values()));
assertTrue("Wrong assignment of lost connectors",
newSubmissions.connectors().containsAll(lostAssignments.connectors()));
@@ -939,9 +822,6 @@ public class IncrementalCooperativeAssignorTest {
@Test
public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 1 worker and 2 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
@@ -975,15 +855,10 @@ public class IncrementalCooperativeAssignorTest {
performStandardRebalance();
assertDelay(0);
assertEmptyAssignment();
-
- verifyCoordinatorInteractions();
}
@Test
public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() {
- updateConfigSnapshot();
- doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
// First assignment with 1 worker and 2 connectors configured but not yet assigned
performStandardRebalance();
assertDelay(0);
@@ -994,7 +869,6 @@ public class IncrementalCooperativeAssignorTest {
// Delete connector1
removeConnector("connector1");
- updateConfigSnapshot();
// Second assignment with a second worker with duplicate assignment joining and the duplicated assignment is deleted at the same time
addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 4));
@@ -1021,8 +895,107 @@ public class IncrementalCooperativeAssignorTest {
performStandardRebalance();
assertDelay(0);
assertEmptyAssignment();
+ }
- verifyCoordinatorInteractions();
+ @Test
+ public void testLeaderStateUpdated() {
+ // Sanity test to make sure that the coordinator's leader state is actually updated after a rebalance
+ connectors.clear();
+ String leader = "followMe";
+ Map<String, ExtendedWorkerState> workerStates = new HashMap<>();
+ workerStates.put(leader, new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, ExtendedAssignment.empty()));
+ WorkerCoordinator coordinator = mock(WorkerCoordinator.class);
+ when(coordinator.configSnapshot()).thenReturn(configState());
+ assignor.performTaskAssignment(
+ leader,
+ CONFIG_OFFSET,
+ workerStates,
+ coordinator,
+ IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2
+ );
+ verify(coordinator).leaderState(notNull());
+ }
+
+ @Test
+ public void testProtocolV1() {
+ // Sanity test to make sure that the right protocol is chosen during the assignment
+ connectors.clear();
+ String leader = "followMe";
+ List<JoinGroupResponseData.JoinGroupResponseMember> memberMetadata = new ArrayList<>();
+ ExtendedAssignment leaderAssignment = new ExtendedAssignment(
+ IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1,
+ ConnectProtocol.Assignment.NO_ERROR,
+ leader,
+ "followMe:618",
+ CONFIG_OFFSET,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ 0
+ );
+ ExtendedWorkerState leaderState = new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, leaderAssignment);
+ JoinGroupResponseData.JoinGroupResponseMember leaderMetadata = new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId(leader)
+ .setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(leaderState, false).array());
+ memberMetadata.add(leaderMetadata);
+ WorkerCoordinator coordinator = mock(WorkerCoordinator.class);
+ when(coordinator.configSnapshot()).thenReturn(configState());
+ Map<String, ByteBuffer> serializedAssignments = assignor.performAssignment(
+ leader,
+ ConnectProtocolCompatibility.COMPATIBLE.protocol(),
+ memberMetadata,
+ coordinator
+ );
+ serializedAssignments.forEach((worker, serializedAssignment) -> {
+ ExtendedAssignment assignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(serializedAssignment);
+ assertEquals(
+ "Incorrect protocol version in assignment for worker " + worker,
+ IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1,
+ assignment.version()
+ );
+ });
+ }
+
+ @Test
+ public void testProtocolV2() {
+ // Sanity test to make sure that the right protocol is chosen during the assignment
+ connectors.clear();
+ String leader = "followMe";
+ List<JoinGroupResponseData.JoinGroupResponseMember> memberMetadata = new ArrayList<>();
+ ExtendedAssignment leaderAssignment = new ExtendedAssignment(
+ IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2,
+ ConnectProtocol.Assignment.NO_ERROR,
+ leader,
+ "followMe:618",
+ CONFIG_OFFSET,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ 0
+ );
+ ExtendedWorkerState leaderState = new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, leaderAssignment);
+ JoinGroupResponseData.JoinGroupResponseMember leaderMetadata = new JoinGroupResponseData.JoinGroupResponseMember()
+ .setMemberId(leader)
+ .setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(leaderState, true).array());
+ memberMetadata.add(leaderMetadata);
+ WorkerCoordinator coordinator = mock(WorkerCoordinator.class);
+ when(coordinator.configSnapshot()).thenReturn(configState());
+ Map<String, ByteBuffer> serializedAssignments = assignor.performAssignment(
+ leader,
+ ConnectProtocolCompatibility.SESSIONED.protocol(),
+ memberMetadata,
+ coordinator
+ );
+ serializedAssignments.forEach((worker, serializedAssignment) -> {
+ ExtendedAssignment assignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(serializedAssignment);
+ assertEquals(
+ "Incorrect protocol version in assignment for worker " + worker,
+ IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2,
+ assignment.version()
+ );
+ });
}
private void performStandardRebalance() {
@@ -1037,23 +1010,12 @@ public class IncrementalCooperativeAssignorTest {
performRebalance(false, true);
}
- private void performRebalance(boolean assignmentFailure, boolean expectGenerationMismatch) {
- expectGeneration(expectGenerationMismatch);
- // Member configs are tracked by the assignor; create a deep copy here so that modifications to our own memberConfigs field
- // are not accidentally propagated to the one used by the assignor
- Map<String, ExtendedWorkerState> memberConfigsCopy = memberConfigs.entrySet().stream().collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> {
- ExtendedWorkerState originalWorkerState = e.getValue();
- return new ExtendedWorkerState(
- originalWorkerState.url(),
- originalWorkerState.offset(),
- duplicate(originalWorkerState.assignment())
- );
- }
- ));
+ private void performRebalance(boolean assignmentFailure, boolean generationMismatch) {
+ generationId++;
+ int lastCompletedGenerationId = generationMismatch ? generationId - 2 : generationId - 1;
try {
- assignor.performTaskAssignment(leader, offset, memberConfigsCopy, coordinator, protocolVersion);
+ Map<String, ConnectorsAndTasks> memberAssignmentsCopy = new HashMap<>(memberAssignments);
+ returnedAssignments = assignor.performTaskAssignment(configState(), lastCompletedGenerationId, generationId, memberAssignmentsCopy);
} catch (RuntimeException e) {
if (assignmentFailure) {
RequestFuture.failure(e);
@@ -1061,21 +1023,12 @@ public class IncrementalCooperativeAssignorTest {
throw e;
}
}
- ++rebalanceNum;
- returnedAssignments = assignmentsCapture.getValue();
assertNoRedundantAssignments();
if (!assignmentFailure) {
- applyAssignments(leader, offset, returnedAssignments);
+ applyAssignments();
}
}
- private void expectGeneration(boolean expectMismatch) {
- when(coordinator.generationId())
- .thenReturn(assignor.previousGenerationId + 1);
- int lastCompletedGenerationId = expectMismatch ? assignor.previousGenerationId - 1 : assignor.previousGenerationId;
- when(coordinator.lastCompletedGenerationId()).thenReturn(lastCompletedGenerationId);
- }
-
private void addNewEmptyWorkers(String... workers) {
for (String worker : workers) {
addNewWorker(worker, Collections.emptyList(), Collections.emptyList());
@@ -1083,12 +1036,10 @@ public class IncrementalCooperativeAssignorTest {
}
private void addNewWorker(String worker, List<String> connectors, List<ConnectorTaskId> tasks) {
- ExtendedAssignment assignment = newExpandableAssignment();
- assignment.connectors().addAll(connectors);
- assignment.tasks().addAll(tasks);
+ ConnectorsAndTasks assignment = new ConnectorsAndTasks.Builder().withCopies(connectors, tasks).build();
assertNull(
"Worker " + worker + " already exists",
- memberConfigs.put(worker, new ExtendedWorkerState(leaderUrl, offset, assignment))
+ memberAssignments.put(worker, assignment)
);
}
@@ -1096,7 +1047,7 @@ public class IncrementalCooperativeAssignorTest {
for (String worker : workers) {
assertNotNull(
"Worker " + worker + " does not exist",
- memberConfigs.remove(worker)
+ memberAssignments.remove(worker)
);
}
}
@@ -1142,20 +1093,10 @@ public class IncrementalCooperativeAssignorTest {
);
}
- private void updateConfigSnapshot() {
- when(coordinator.configSnapshot()).thenReturn(configState());
- }
-
private ClusterConfigState configState() {
Map<String, Integer> taskCounts = new HashMap<>(connectors);
- Map<String, Map<String, String>> connectorConfigs = taskCounts.keySet().stream().collect(Collectors.toMap(
- Function.identity(),
- connector -> Collections.emptyMap()
- ));
- Map<String, TargetState> targetStates = taskCounts.keySet().stream().collect(Collectors.toMap(
- Function.identity(),
- connector -> TargetState.STARTED
- ));
+ Map<String, Map<String, String>> connectorConfigs = transformValues(taskCounts, c -> Collections.emptyMap());
+ Map<String, TargetState> targetStates = transformValues(taskCounts, c -> TargetState.STARTED);
Map<ConnectorTaskId, Map<String, String>> taskConfigs = taskCounts.entrySet().stream()
.flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> new ConnectorTaskId(e.getKey(), i)))
.collect(Collectors.toMap(
@@ -1163,78 +1104,59 @@ public class IncrementalCooperativeAssignorTest {
connectorTaskId -> Collections.emptyMap()
));
return new ClusterConfigState(
- offset,
+ CONFIG_OFFSET,
null,
taskCounts,
connectorConfigs,
targetStates,
taskConfigs,
- Collections.emptySet()
- );
- }
-
- private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, long givenOffset, String... workers) {
- return Stream.of(workers).collect(Collectors.toMap(
- Function.identity(),
- w -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, newExpandableAssignment(givenLeader, givenOffset))
- ));
+ Collections.emptySet());
}
- private void applyAssignments(String leader, long offset, Map<String, ExtendedAssignment> newAssignments) {
- newAssignments.forEach((worker, newAssignment) -> {
- ExtendedAssignment workerAssignment = Optional.ofNullable(memberConfigs.get(worker))
- .map(ExtendedWorkerState::assignment)
- .orElseGet(this::newExpandableAssignment);
- workerAssignment.connectors().removeAll(newAssignment.revokedConnectors());
- workerAssignment.connectors().addAll(newAssignment.connectors());
- workerAssignment.tasks().removeAll(newAssignment.revokedTasks());
- workerAssignment.tasks().addAll(newAssignment.tasks());
- memberConfigs.put(worker, new ExtendedWorkerState(expectedLeaderUrl(leader), offset, workerAssignment));
- });
- }
-
- private ExtendedAssignment newExpandableAssignment() {
- return newExpandableAssignment(leader, offset);
- }
+ private void applyAssignments() {
+ returnedAssignments.allWorkers().forEach(worker -> {
+ ConnectorsAndTasks workerAssignment = memberAssignments.computeIfAbsent(worker, ignored -> new ConnectorsAndTasks.Builder().build());
- private ExtendedAssignment newExpandableAssignment(String leader, long offset) {
- return new ExtendedAssignment(
- protocolVersion,
- ConnectProtocol.Assignment.NO_ERROR,
- leader,
- expectedLeaderUrl(leader),
- offset,
- new ArrayList<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- 0);
- }
+ workerAssignment.connectors().removeAll(returnedAssignments.newlyRevokedConnectors(worker));
+ workerAssignment.connectors().addAll(returnedAssignments.newlyAssignedConnectors(worker));
+ workerAssignment.tasks().removeAll(returnedAssignments.newlyRevokedTasks(worker));
+ workerAssignment.tasks().addAll(returnedAssignments.newlyAssignedTasks(worker));
- private static String expectedLeaderUrl(String givenLeader) {
- return "http://" + givenLeader + ":8083";
+ assertEquals(
+ "Complete connector assignment for worker " + worker + " does not match expectations " +
+ "based on prior assignment and new revocations and assignments",
+ workerAssignment.connectors(),
+ returnedAssignments.allAssignedConnectors().get(worker)
+ );
+ assertEquals(
+ "Complete task assignment for worker " + worker + " does not match expectations " +
+ "based on prior assignment and new revocations and assignments",
+ workerAssignment.tasks(),
+ returnedAssignments.allAssignedTasks().get(worker)
+ );
+ });
}
private void assertEmptyAssignment() {
assertEquals(
"No connectors should have been newly assigned during this round",
Collections.emptyList(),
- extractFromAssignments(returnedAssignments, ConnectProtocol.Assignment::connectors)
+ ConnectUtils.combineCollections(returnedAssignments.newlyAssignedConnectors().values())
);
assertEquals(
"No tasks should have been newly assigned during this round",
Collections.emptyList(),
- extractFromAssignments(returnedAssignments, ConnectProtocol.Assignment::tasks)
+ ConnectUtils.combineCollections(returnedAssignments.newlyAssignedTasks().values())
);
assertEquals(
"No connectors should have been revoked during this round",
Collections.emptyList(),
- extractFromAssignments(returnedAssignments, ExtendedAssignment::revokedConnectors)
+ ConnectUtils.combineCollections(returnedAssignments.newlyRevokedConnectors().values())
);
assertEquals(
"No tasks should have been revoked during this round",
Collections.emptyList(),
- extractFromAssignments(returnedAssignments, ExtendedAssignment::revokedTasks)
+ ConnectUtils.combineCollections(returnedAssignments.newlyRevokedTasks().values())
);
}
@@ -1242,7 +1164,7 @@ public class IncrementalCooperativeAssignorTest {
assertEquals(
"Wrong set of workers",
new HashSet<>(Arrays.asList(workers)),
- returnedAssignments.keySet()
+ returnedAssignments.allWorkers()
);
}
@@ -1253,7 +1175,7 @@ public class IncrementalCooperativeAssignorTest {
* and one worker that is assigned three connectors.
*/
private void assertConnectorAllocations(int... connectorCounts) {
- assertAllocations("connectors", ExtendedAssignment::connectors, connectorCounts);
+ assertAllocations("connectors", ConnectorsAndTasks::connectors, connectorCounts);
}
/**
@@ -1263,10 +1185,10 @@ public class IncrementalCooperativeAssignorTest {
* and one worker that is assigned three tasks.
*/
private void assertTaskAllocations(int... taskCounts) {
- assertAllocations("tasks", ExtendedAssignment::tasks, taskCounts);
+ assertAllocations("tasks", ConnectorsAndTasks::tasks, taskCounts);
}
- private void assertAllocations(String allocated, Function<ExtendedAssignment, ? extends Collection<?>> allocation, int... rawExpectedAllocations) {
+ private void assertAllocations(String allocated, Function<ConnectorsAndTasks, ? extends Collection<?>> allocation, int... rawExpectedAllocations) {
List<Integer> expectedAllocations = IntStream.of(rawExpectedAllocations)
.boxed()
.sorted()
@@ -1279,9 +1201,8 @@ public class IncrementalCooperativeAssignorTest {
);
}
- private List<Integer> allocations(Function<ExtendedAssignment, ? extends Collection<?>> allocation) {
- return memberConfigs.values().stream()
- .map(ExtendedWorkerState::assignment)
+ private List<Integer> allocations(Function<ConnectorsAndTasks, ? extends Collection<?>> allocation) {
+ return memberAssignments.values().stream()
.map(allocation)
.map(Collection::size)
.sorted()
@@ -1289,8 +1210,11 @@ public class IncrementalCooperativeAssignorTest {
}
private void assertDelay(int expectedDelay) {
- returnedAssignments.values().forEach(a -> assertEquals(
- "Wrong rebalance delay in " + a, expectedDelay, a.delay()));
+ assertEquals(
+ "Wrong rebalance delay",
+ expectedDelay,
+ assignor.delay
+ );
}
/**
@@ -1298,15 +1222,10 @@ public class IncrementalCooperativeAssignorTest {
* and that each newly-assigned connector and task is only assigned to a single worker.
*/
private void assertNoRedundantAssignments() {
- Map<String, ExtendedAssignment> existingAssignments = memberConfigs.entrySet().stream().collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> e.getValue().assignment()
- ));
-
- List<String> existingConnectors = extractFromAssignments(existingAssignments, ConnectProtocol.Assignment::connectors);
- List<String> newConnectors = extractFromAssignments(returnedAssignments, ConnectProtocol.Assignment::connectors);
- List<ConnectorTaskId> existingTasks = extractFromAssignments(existingAssignments, ConnectProtocol.Assignment::tasks);
- List<ConnectorTaskId> newTasks = extractFromAssignments(returnedAssignments, ConnectProtocol.Assignment::tasks);
+ List<String> existingConnectors = ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::connectors);
+ List<String> newConnectors = ConnectUtils.combineCollections(returnedAssignments.newlyAssignedConnectors().values());
+ List<ConnectorTaskId> existingTasks = ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::tasks);
+ List<ConnectorTaskId> newTasks = ConnectUtils.combineCollections(returnedAssignments.newlyAssignedTasks().values());
assertNoDuplicates(
newConnectors,
@@ -1333,8 +1252,8 @@ public class IncrementalCooperativeAssignorTest {
}
private void assertBalancedAllocation() {
- List<Integer> connectorCounts = allocations(ExtendedAssignment::connectors);
- List<Integer> taskCounts = allocations(ExtendedAssignment::tasks);
+ List<Integer> connectorCounts = allocations(ConnectorsAndTasks::connectors);
+ List<Integer> taskCounts = allocations(ConnectorsAndTasks::tasks);
int minConnectors = connectorCounts.get(0);
int maxConnectors = connectorCounts.get(connectorCounts.size() - 1);
@@ -1353,14 +1272,15 @@ public class IncrementalCooperativeAssignorTest {
}
private void assertCompleteAllocation() {
- List<String> allAssignedConnectors = extractFromAssignments(memberConfigs, e -> e.assignment().connectors());
+ List<String> allAssignedConnectors = ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::connectors);
assertEquals(
"The set of connectors assigned across the cluster does not match the set of connectors in the config topic",
connectors.keySet(),
new HashSet<>(allAssignedConnectors)
);
- Map<String, List<ConnectorTaskId>> allAssignedTasks = extractFromAssignments(memberConfigs, e -> e.assignment().tasks()).stream()
+ Map<String, List<ConnectorTaskId>> allAssignedTasks = ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::tasks)
+ .stream()
.collect(Collectors.groupingBy(ConnectorTaskId::connector, Collectors.toList()));
connectors.forEach((connector, taskCount) -> {
@@ -1375,24 +1295,6 @@ public class IncrementalCooperativeAssignorTest {
});
}
- private void verifyCoordinatorInteractions() {
- verify(coordinator, times(rebalanceNum)).configSnapshot();
- verify(coordinator, times(rebalanceNum)).leaderState(any());
- verify(coordinator, times(2 * rebalanceNum)).generationId();
- verify(coordinator, times(rebalanceNum)).memberId();
- verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
- }
-
- private static <A, T> List<T> extractFromAssignments(
- Map<String, A> assignments,
- Function<A, Collection<T>> extraction
- ) {
- return assignments.values().stream()
- .map(extraction)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
- }
-
private static <T> void assertNoDuplicates(List<T> collection, String assertionMessage) {
assertEquals(
assertionMessage,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index 35ba6249d7..3a15956ff1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -215,7 +215,7 @@ public class WorkerCoordinatorIncrementalTest {
CONNECT_PROTOCOL_V1, ExtendedAssignment.NO_ERROR, leaderId, leaderUrl, configState1.offset(),
Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0),
Collections.emptyList(), Collections.emptyList(), 0);
- ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+ ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
// Using onJoinComplete to register the protocol selection decided by the broker
// coordinator as well as an existing previous assignment that the call to metadata will
// include with v1 but not with v0
@@ -246,7 +246,7 @@ public class WorkerCoordinatorIncrementalTest {
CONNECT_PROTOCOL_V1, ExtendedAssignment.NO_ERROR, leaderId, leaderUrl, configState1.offset(),
Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0),
Collections.emptyList(), Collections.emptyList(), 0);
- ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+ ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
// Using onJoinComplete to register the protocol selection decided by the broker
// coordinator as well as an existing previous assignment that the call to metadata will
// include with v1 but not with v0