You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/01 22:26:46 UTC

[GitHub] [kafka] gharris1727 commented on a change in pull request #10014: KAFKA-12252 and KAFKA-12262: Fix key rotation when leadership changes

gharris1727 commented on a change in pull request #10014:
URL: https://github.com/apache/kafka/pull/10014#discussion_r568174791



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -381,7 +381,7 @@ public void tick() {
             log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ",
                     scheduledRebalance, now, nextRequestTimeoutMs);
         }
-        if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
+        if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {

Review comment:
       I don't think the `keyExpiration < MAX_VALUE` condition is strictly necessary any more, since if we're the leader, we must either have a valid key by this point. It made sense before, as it was basically an implcit `isLeader()` check that explicitly happened much earlier during the callback. Now, we either had a key read before, or added one at the beginning of this loop (and read to the end of the log, executing all callbacks including the one which updates `keyExpiration`). 
   Neither the `isLeader()` or `internalRequestValidationEnabled()` conditions should change in between the `checkForKeyRotation(now)` call and here, so I don't think there's danger of hitting this condition when the `checkForKeyRotation(now)` didn't ensure that we have a key.
   
   If `putSessionKey()` ever changed implementation to not read back the key and ensure that callbacks were executed prior to returning, then this condition would guard in that case, and prevent us from scheduling a `nextRequestTimeoutMs` 50y earlier than the otherwise very reasonable 292 million years in the future. We would "forget" to schedule a key expiration because of a race condition while installing a new key, and only schedule a rebalance the next time a rebalance occurred due to some other factor.
   
   Seems harmless to remove, and only changes behavior for a case where we introduce a (worse) concurrency bug. Removing it will keep someone from wondering "why is that condition there" at some point in the future though :)

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
##########
@@ -2161,6 +2163,83 @@ public Boolean answer() throws Throwable {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        // First rebalance: poll indefinitely as no key has been read yet, so expiration doesn't come into play
+        member.poll(Long.MAX_VALUE);
+        EasyMock.expectLastCall();
+
+        expectRebalance(2, Collections.emptyList(), Collections.emptyList());
+        SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 0);
+        ClusterConfigState snapshotWithKey =  new ClusterConfigState(2, initialKey, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
+        expectPostRebalanceCatchup(snapshotWithKey);
+        // Second rebalance: poll indefinitely as worker is follower, so expiration still doesn't come into play
+        member.poll(Long.MAX_VALUE);
+        EasyMock.expectLastCall();
+
+        expectRebalance(2, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL);
+        Capture<SessionKey> updatedKey = EasyMock.newCapture();
+        configBackingStore.putSessionKey(EasyMock.capture(updatedKey));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            configUpdateListener.onSessionKeyUpdate(updatedKey.getValue());
+            return null;
+        });
+        // Third rebalance: poll for a limited time as worker has become leader and must wake up for key expiration
+        Capture<Long> pollTimeout = EasyMock.newCapture();
+        member.poll(EasyMock.captureLong(pollTimeout));
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        configUpdateListener.onSessionKeyUpdate(initialKey);
+        herder.tick();
+        herder.tick();
+
+        assertTrue(pollTimeout.getValue() <= DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+    }

Review comment:
       nit: Do these tests need a `PowerMock.verifyAll()` at the end? I see a `PowerMock.replayAll()` call but not any `PowerMock.expectLastCall()` or anything that makes it seem like powermock is being used functionally here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org