You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/06/18 16:34:47 UTC

[kafka] branch 2.7 updated: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 91d41e9  KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014)
91d41e9 is described below

commit 91d41e948fc1d5317d056960b99570718b3ea2c6
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed May 5 17:11:15 2021 -0400

    KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014)
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 checkstyle/suppressions.xml                        |   3 +
 .../runtime/distributed/DistributedHerder.java     |   9 +-
 .../runtime/distributed/DistributedHerderTest.java | 140 +++++++++++++++++----
 3 files changed, 126 insertions(+), 26 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 97a96c8..1665414 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -142,6 +142,9 @@
     <suppress checks="MethodLength"
               files="(RequestResponse|WorkerSinkTask)Test.java"/>
 
+    <suppress checks="JavaNCSS"
+              files="DistributedHerderTest.java"/>
+
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
               files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 93f6141..df90c45 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -409,7 +409,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ",
                     scheduledRebalance, now, nextRequestTimeoutMs);
         }
-        if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
+        if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
             nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(keyExpiration - now, 0));
             log.debug("Scheduled next key rotation at: {} (now: {} nextRequestTimeoutMs: {}) ",
                     keyExpiration, now, nextRequestTimeoutMs);
@@ -1620,10 +1620,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
             synchronized (DistributedHerder.this) {
                 DistributedHerder.this.sessionKey = sessionKey.key();
-                // Track the expiration of the key if and only if this worker is the leader
+                // Track the expiration of the key.
                 // Followers will receive rotated keys from the leader and won't be responsible for
-                // tracking expiration and distributing new keys themselves
-                if (isLeader() && keyRotationIntervalMs > 0) {
+                // tracking expiration and distributing new keys themselves, but may become leaders
+                // later on and will need to know when to update the key.
+                if (keyRotationIntervalMs > 0) {
                     DistributedHerder.this.keyExpiration = sessionKey.creationTimestamp() + keyRotationIntervalMs;
                 }
             }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 7ededef..519cda8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -2170,6 +2170,84 @@ public class DistributedHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        // First rebalance: poll indefinitely as no key has been read yet, so expiration doesn't come into play
+        member.poll(Long.MAX_VALUE);
+        EasyMock.expectLastCall();
+
+        expectRebalance(2, Collections.emptyList(), Collections.emptyList());
+        SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 0);
+        ClusterConfigState snapshotWithKey =  new ClusterConfigState(2, initialKey, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
+        expectPostRebalanceCatchup(snapshotWithKey);
+        // Second rebalance: poll indefinitely as worker is follower, so expiration still doesn't come into play
+        member.poll(Long.MAX_VALUE);
+        EasyMock.expectLastCall();
+
+        expectRebalance(2, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL);
+        Capture<SessionKey> updatedKey = EasyMock.newCapture();
+        configBackingStore.putSessionKey(EasyMock.capture(updatedKey));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            configUpdateListener.onSessionKeyUpdate(updatedKey.getValue());
+            return null;
+        });
+        // Third rebalance: poll for a limited time as worker has become leader and must wake up for key expiration
+        Capture<Long> pollTimeout = EasyMock.newCapture();
+        member.poll(EasyMock.captureLong(pollTimeout));
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        configUpdateListener.onSessionKeyUpdate(initialKey);
+        herder.tick();
+        herder.tick();
+
+        assertTrue(pollTimeout.getValue() <= DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testKeyRotationDisabledWhenWorkerBecomesFollower() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL);
+        SecretKey initialSecretKey = EasyMock.mock(SecretKey.class);
+        EasyMock.expect(initialSecretKey.getAlgorithm()).andReturn(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT).anyTimes();
+        EasyMock.expect(initialSecretKey.getEncoded()).andReturn(new byte[32]).anyTimes();
+        SessionKey initialKey = new SessionKey(initialSecretKey, time.milliseconds());
+        ClusterConfigState snapshotWithKey =  new ClusterConfigState(1, initialKey, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
+        expectPostRebalanceCatchup(snapshotWithKey);
+        // First rebalance: poll for a limited time as worker is leader and must wake up for key expiration
+        Capture<Long> firstPollTimeout = EasyMock.newCapture();
+        member.poll(EasyMock.captureLong(firstPollTimeout));
+        EasyMock.expectLastCall();
+
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+        // Second rebalance: poll indefinitely as worker is no longer leader, so key expiration doesn't come into play
+        member.poll(Long.MAX_VALUE);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll(initialSecretKey);
+
+        configUpdateListener.onSessionKeyUpdate(initialKey);
+        herder.tick();
+        assertTrue(firstPollTimeout.getValue() <= DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
 
     @Test
     public void testPutTaskConfigsSignatureNotRequiredV0() {
@@ -2405,6 +2483,14 @@ public class DistributedHerderTest {
                 ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks, 0);
     }
 
+    private void expectRebalance(final long offset,
+                                 final List<String> assignedConnectors,
+                                 final List<ConnectorTaskId> assignedTasks,
+                                 String leader, String leaderUrl) {
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR, offset, leader, leaderUrl, assignedConnectors, assignedTasks, 0);
+    }
+
     // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
     private void expectRebalance(final Collection<String> revokedConnectors,
                                  final List<ConnectorTaskId> revokedTasks,
@@ -2423,30 +2509,40 @@ public class DistributedHerderTest {
                                  final List<String> assignedConnectors,
                                  final List<ConnectorTaskId> assignedTasks,
                                  int delay) {
+        expectRebalance(revokedConnectors, revokedTasks, error, offset, "leader", "leaderUrl", assignedConnectors, assignedTasks, delay);
+    }
+
+    // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
+    private void expectRebalance(final Collection<String> revokedConnectors,
+                                 final List<ConnectorTaskId> revokedTasks,
+                                 final short error,
+                                 final long offset,
+                                 String leader,
+                                 String leaderUrl,
+                                 final List<String> assignedConnectors,
+                                 final List<ConnectorTaskId> assignedTasks,
+                                 int delay) {
         member.ensureActive();
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                ExtendedAssignment assignment;
-                if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) {
-                    rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
-                }
-
-                if (connectProtocolVersion == CONNECT_PROTOCOL_V0) {
-                    assignment = new ExtendedAssignment(
-                            connectProtocolVersion, error, "leader", "leaderUrl", offset,
-                            assignedConnectors, assignedTasks,
-                            Collections.emptyList(), Collections.emptyList(), 0);
-                } else {
-                    assignment = new ExtendedAssignment(
-                            connectProtocolVersion, error, "leader", "leaderUrl", offset,
-                            new ArrayList<>(assignedConnectors), new ArrayList<>(assignedTasks),
-                            new ArrayList<>(revokedConnectors), new ArrayList<>(revokedTasks), delay);
-                }
-                rebalanceListener.onAssigned(assignment, 3);
-                time.sleep(100L);
-                return null;
+        PowerMock.expectLastCall().andAnswer(() -> {
+            ExtendedAssignment assignment;
+            if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) {
+                rebalanceListener.onRevoked(leader, revokedConnectors, revokedTasks);
+            }
+
+            if (connectProtocolVersion == CONNECT_PROTOCOL_V0) {
+                assignment = new ExtendedAssignment(
+                        connectProtocolVersion, error, leader, leaderUrl, offset,
+                        assignedConnectors, assignedTasks,
+                        Collections.emptyList(), Collections.emptyList(), 0);
+            } else {
+                assignment = new ExtendedAssignment(
+                        connectProtocolVersion, error, leader, leaderUrl, offset,
+                        new ArrayList<>(assignedConnectors), new ArrayList<>(assignedTasks),
+                        new ArrayList<>(revokedConnectors), new ArrayList<>(revokedTasks), delay);
             }
+            rebalanceListener.onAssigned(assignment, 3);
+            time.sleep(100L);
+            return null;
         });
 
         if (!revokedConnectors.isEmpty()) {