You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/28 17:50:55 UTC
[kafka] branch trunk updated: MINOR: reformat ClusterConfigState constructions in Abstract & DistributedHerder (#13286)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7322f4cd55d MINOR: reformat ClusterConfigState constructions in Abstract & DistributedHerder (#13286)
7322f4cd55d is described below
commit 7322f4cd55dc08abdc6ccf51ed33f7f0d869dd0e
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Tue Feb 28 09:50:44 2023 -0800
MINOR: reformat ClusterConfigState constructions in Abstract & DistributedHerder (#13286)
Reviewers: Chris Egerton <ch...@aiven.io>
---
.../kafka/connect/runtime/AbstractHerderTest.java | 28 +++-
.../runtime/distributed/DistributedHerderTest.java | 157 ++++++++++++++++-----
2 files changed, 146 insertions(+), 39 deletions(-)
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 032f09322d7..7cb0b056b9f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -129,12 +129,28 @@ public class AbstractHerderTest {
TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
}
- private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
- private static final ClusterConfigState SNAPSHOT_NO_TASKS = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+ private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ private static final ClusterConfigState SNAPSHOT_NO_TASKS = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
private final String workerId = "workerId";
private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 722be88fb7e..03aadd31166 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -193,15 +193,39 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
}
- private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
- private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.PAUSED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
- private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+ private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.PAUSED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
private static final String WORKER_ID = "localhost:8083";
private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@@ -623,9 +647,17 @@ public class DistributedHerderTest {
// Same as SNAPSHOT, except with an updated offset
// Allow the task to read to the end of the topic and complete the rebalance
ClusterConfigState secondSnapshot = new ClusterConfigState(
- configOffset, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+ configOffset,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet()
+ );
expectConfigRefreshAndSnapshot(secondSnapshot);
}
member.requestRejoin();
@@ -1962,9 +1994,19 @@ public class DistributedHerderTest {
member.ensureActive();
PowerMock.expectLastCall();
// During the next tick, throw an error from the transformer
- ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), configTransformer);
+ ClusterConfigState snapshotWithTransform = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ configTransformer
+ );
EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithTransform);
EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject()))
.andThrow(new ConfigException("Simulated exception thrown during config transformation"));
@@ -2558,9 +2600,18 @@ public class DistributedHerderTest {
EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject()))
.andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info"));
EasyMock.replay(configTransformer);
- ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), configTransformer);
+ ClusterConfigState snapshotWithTransform = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ configTransformer);
expectConfigRefreshAndSnapshot(snapshotWithTransform);
@@ -2721,9 +2772,17 @@ public class DistributedHerderTest {
expectRebalance(2, Collections.emptyList(), Collections.emptyList());
SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 0);
- ClusterConfigState snapshotWithKey = new ClusterConfigState(2, initialKey, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(
+ 2,
+ initialKey,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
expectConfigRefreshAndSnapshot(snapshotWithKey);
// Second rebalance: poll indefinitely as worker is follower, so expiration still doesn't come into play
member.poll(Long.MAX_VALUE);
@@ -2761,9 +2820,17 @@ public class DistributedHerderTest {
EasyMock.expect(initialSecretKey.getAlgorithm()).andReturn(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT).anyTimes();
EasyMock.expect(initialSecretKey.getEncoded()).andReturn(new byte[32]).anyTimes();
SessionKey initialKey = new SessionKey(initialSecretKey, time.milliseconds());
- ClusterConfigState snapshotWithKey = new ClusterConfigState(1, initialKey, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(
+ 1,
+ initialKey,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
expectConfigRefreshAndSnapshot(snapshotWithKey);
// First rebalance: poll for a limited time as worker is leader and must wake up for key expiration
member.poll(leq(rotationTtlDelay));
@@ -2962,9 +3029,17 @@ public class DistributedHerderTest {
EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds());
- ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, sessionKey, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+ ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(
+ 1,
+ sessionKey,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
// First tick -- after joining the group, we try to write a new session key to
// the config topic, and fail (in this case, we're trying to simulate that we've
@@ -3514,9 +3589,17 @@ public class DistributedHerderTest {
@Test
public void testVerifyTaskGeneration() {
Map<String, Integer> taskConfigGenerations = new HashMap<>();
- herder.configState = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
- TASK_CONFIGS_MAP, Collections.emptyMap(), taskConfigGenerations, Collections.emptySet(), Collections.emptySet());
+ herder.configState = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP,
+ Collections.emptyMap(),
+ taskConfigGenerations,
+ Collections.emptySet(),
+ Collections.emptySet());
Callback<Void> verifyCallback = EasyMock.mock(Callback.class);
for (int i = 0; i < 5; i++) {
@@ -3938,9 +4021,17 @@ public class DistributedHerderTest {
Map<String, Map<String, String>> connectorConfigs = connectors.stream()
.collect(Collectors.toMap(Function.identity(), c -> CONN1_CONFIG));
- return new ClusterConfigState(1, sessionKey, taskCounts,
- connectorConfigs, Collections.singletonMap(CONN1, TargetState.STARTED),
- taskConfigs, taskCountRecords, taskConfigGenerations, pendingFencing, Collections.emptySet());
+ return new ClusterConfigState(
+ 1,
+ sessionKey,
+ taskCounts,
+ connectorConfigs,
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ taskConfigs,
+ taskCountRecords,
+ taskConfigGenerations,
+ pendingFencing,
+ Collections.emptySet());
}
private void expectExecuteTaskReconfiguration(boolean running, ConnectorConfig connectorConfig, IAnswer<List<Map<String, String>>> answer) {