You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/28 17:50:55 UTC

[kafka] branch trunk updated: MINOR: reformat ClusterConfigState constructions in Abstract & DistributedHerder (#13286)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7322f4cd55d MINOR: reformat ClusterConfigState constructions in Abstract & DistributedHerder (#13286)
7322f4cd55d is described below

commit 7322f4cd55dc08abdc6ccf51ed33f7f0d869dd0e
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Tue Feb 28 09:50:44 2023 -0800

    MINOR: reformat ClusterConfigState constructions in Abstract & DistributedHerder (#13286)
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../kafka/connect/runtime/AbstractHerderTest.java  |  28 +++-
 .../runtime/distributed/DistributedHerderTest.java | 157 ++++++++++++++++-----
 2 files changed, 146 insertions(+), 39 deletions(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 032f09322d7..7cb0b056b9f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -129,12 +129,28 @@ public class AbstractHerderTest {
         TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
         TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
     }
-    private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
-    private static final ClusterConfigState SNAPSHOT_NO_TASKS = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-            Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT_NO_TASKS = new ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.STARTED),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
 
     private final String workerId = "workerId";
     private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 722be88fb7e..03aadd31166 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -193,15 +193,39 @@ public class DistributedHerderTest {
         TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
         TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
     }
-    private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
-    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.PAUSED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
-    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.PAUSED),
+            TASK_CONFIGS_MAP,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED),
+            Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
 
     private static final String WORKER_ID = "localhost:8083";
     private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@@ -623,9 +647,17 @@ public class DistributedHerderTest {
             // Same as SNAPSHOT, except with an updated offset
             // Allow the task to read to the end of the topic and complete the rebalance
             ClusterConfigState secondSnapshot = new ClusterConfigState(
-                configOffset, null, Collections.singletonMap(CONN1, 3),
-                Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-                TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+                    configOffset,
+                    null,
+                    Collections.singletonMap(CONN1, 3),
+                    Collections.singletonMap(CONN1, CONN1_CONFIG),
+                    Collections.singletonMap(CONN1, TargetState.STARTED),
+                    TASK_CONFIGS_MAP,
+                    Collections.emptyMap(),
+                    Collections.emptyMap(),
+                    Collections.emptySet(),
+                    Collections.emptySet()
+            );
             expectConfigRefreshAndSnapshot(secondSnapshot);
         }
         member.requestRejoin();
@@ -1962,9 +1994,19 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
         // During the next tick, throw an error from the transformer
-        ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-                Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-                TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), configTransformer);
+        ClusterConfigState snapshotWithTransform = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                configTransformer
+        );
         EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithTransform);
         EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject()))
             .andThrow(new ConfigException("Simulated exception thrown during config transformation"));
@@ -2558,9 +2600,18 @@ public class DistributedHerderTest {
         EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject()))
             .andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info"));
         EasyMock.replay(configTransformer);
-        ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), configTransformer);
+        ClusterConfigState snapshotWithTransform = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                configTransformer);
 
         expectConfigRefreshAndSnapshot(snapshotWithTransform);
 
@@ -2721,9 +2772,17 @@ public class DistributedHerderTest {
 
         expectRebalance(2, Collections.emptyList(), Collections.emptyList());
         SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 0);
-        ClusterConfigState snapshotWithKey =  new ClusterConfigState(2, initialKey, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+        ClusterConfigState snapshotWithKey =  new ClusterConfigState(
+                2,
+                initialKey,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
         expectConfigRefreshAndSnapshot(snapshotWithKey);
         // Second rebalance: poll indefinitely as worker is follower, so expiration still doesn't come into play
         member.poll(Long.MAX_VALUE);
@@ -2761,9 +2820,17 @@ public class DistributedHerderTest {
         EasyMock.expect(initialSecretKey.getAlgorithm()).andReturn(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT).anyTimes();
         EasyMock.expect(initialSecretKey.getEncoded()).andReturn(new byte[32]).anyTimes();
         SessionKey initialKey = new SessionKey(initialSecretKey, time.milliseconds());
-        ClusterConfigState snapshotWithKey =  new ClusterConfigState(1, initialKey, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+        ClusterConfigState snapshotWithKey =  new ClusterConfigState(
+                1,
+                initialKey,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
         expectConfigRefreshAndSnapshot(snapshotWithKey);
         // First rebalance: poll for a limited time as worker is leader and must wake up for key expiration
         member.poll(leq(rotationTtlDelay));
@@ -2962,9 +3029,17 @@ public class DistributedHerderTest {
         EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
         EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
         SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds());
-        ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, sessionKey, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+        ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(
+                1,
+                sessionKey,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
 
         // First tick -- after joining the group, we try to write a new session key to
         // the config topic, and fail (in this case, we're trying to simulate that we've
@@ -3514,9 +3589,17 @@ public class DistributedHerderTest {
     @Test
     public void testVerifyTaskGeneration() {
         Map<String, Integer> taskConfigGenerations = new HashMap<>();
-        herder.configState = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-                Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
-                TASK_CONFIGS_MAP, Collections.emptyMap(), taskConfigGenerations, Collections.emptySet(), Collections.emptySet());
+        herder.configState = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                taskConfigGenerations,
+                Collections.emptySet(),
+                Collections.emptySet());
 
         Callback<Void> verifyCallback = EasyMock.mock(Callback.class);
         for (int i = 0; i < 5; i++) {
@@ -3938,9 +4021,17 @@ public class DistributedHerderTest {
         Map<String, Map<String, String>> connectorConfigs = connectors.stream()
                 .collect(Collectors.toMap(Function.identity(), c -> CONN1_CONFIG));
 
-        return new ClusterConfigState(1, sessionKey, taskCounts,
-                connectorConfigs, Collections.singletonMap(CONN1, TargetState.STARTED),
-                taskConfigs, taskCountRecords, taskConfigGenerations, pendingFencing, Collections.emptySet());
+        return new ClusterConfigState(
+                1,
+                sessionKey,
+                taskCounts,
+                connectorConfigs,
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                taskConfigs,
+                taskCountRecords,
+                taskConfigGenerations,
+                pendingFencing,
+                Collections.emptySet());
     }
 
     private void expectExecuteTaskReconfiguration(boolean running, ConnectorConfig connectorConfig, IAnswer<List<Map<String, String>>> answer) {