You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/06/18 16:34:47 UTC
[kafka] branch 2.7 updated: KAFKA-12252 and KAFKA-12262: Fix
session key rotation when leadership changes (#10014)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 91d41e9 KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014)
91d41e9 is described below
commit 91d41e948fc1d5317d056960b99570718b3ea2c6
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed May 5 17:11:15 2021 -0400
KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014)
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
checkstyle/suppressions.xml | 3 +
.../runtime/distributed/DistributedHerder.java | 9 +-
.../runtime/distributed/DistributedHerderTest.java | 140 +++++++++++++++++----
3 files changed, 126 insertions(+), 26 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 97a96c8..1665414 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -142,6 +142,9 @@
<suppress checks="MethodLength"
files="(RequestResponse|WorkerSinkTask)Test.java"/>
+ <suppress checks="JavaNCSS"
+ files="DistributedHerderTest.java"/>
+
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 93f6141..df90c45 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -409,7 +409,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ",
scheduledRebalance, now, nextRequestTimeoutMs);
}
- if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
+ if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) {
nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(keyExpiration - now, 0));
log.debug("Scheduled next key rotation at: {} (now: {} nextRequestTimeoutMs: {}) ",
keyExpiration, now, nextRequestTimeoutMs);
@@ -1620,10 +1620,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
synchronized (DistributedHerder.this) {
DistributedHerder.this.sessionKey = sessionKey.key();
- // Track the expiration of the key if and only if this worker is the leader
+ // Track the expiration of the key.
// Followers will receive rotated keys from the leader and won't be responsible for
- // tracking expiration and distributing new keys themselves
- if (isLeader() && keyRotationIntervalMs > 0) {
+ // tracking expiration and distributing new keys themselves, but may become leaders
+ // later on and will need to know when to update the key.
+ if (keyRotationIntervalMs > 0) {
DistributedHerder.this.keyExpiration = sessionKey.creationTimestamp() + keyRotationIntervalMs;
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 7ededef..519cda8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -2170,6 +2170,84 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+ EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ // First rebalance: poll indefinitely as no key has been read yet, so expiration doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ expectRebalance(2, Collections.emptyList(), Collections.emptyList());
+ SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 0);
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(2, initialKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ expectPostRebalanceCatchup(snapshotWithKey);
+ // Second rebalance: poll indefinitely as worker is follower, so expiration still doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ expectRebalance(2, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL);
+ Capture<SessionKey> updatedKey = EasyMock.newCapture();
+ configBackingStore.putSessionKey(EasyMock.capture(updatedKey));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ configUpdateListener.onSessionKeyUpdate(updatedKey.getValue());
+ return null;
+ });
+ // Third rebalance: poll for a limited time as worker has become leader and must wake up for key expiration
+ Capture<Long> pollTimeout = EasyMock.newCapture();
+ member.poll(EasyMock.captureLong(pollTimeout));
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick();
+ configUpdateListener.onSessionKeyUpdate(initialKey);
+ herder.tick();
+ herder.tick();
+
+ assertTrue(pollTimeout.getValue() <= DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testKeyRotationDisabledWhenWorkerBecomesFollower() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+ EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList(), "member", MEMBER_URL);
+ SecretKey initialSecretKey = EasyMock.mock(SecretKey.class);
+ EasyMock.expect(initialSecretKey.getAlgorithm()).andReturn(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT).anyTimes();
+ EasyMock.expect(initialSecretKey.getEncoded()).andReturn(new byte[32]).anyTimes();
+ SessionKey initialKey = new SessionKey(initialSecretKey, time.milliseconds());
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(1, initialKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ expectPostRebalanceCatchup(snapshotWithKey);
+ // First rebalance: poll for a limited time as worker is leader and must wake up for key expiration
+ Capture<Long> firstPollTimeout = EasyMock.newCapture();
+ member.poll(EasyMock.captureLong(firstPollTimeout));
+ EasyMock.expectLastCall();
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ // Second rebalance: poll indefinitely as worker is no longer leader, so key expiration doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll(initialSecretKey);
+
+ configUpdateListener.onSessionKeyUpdate(initialKey);
+ herder.tick();
+ assertTrue(firstPollTimeout.getValue() <= DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
@Test
public void testPutTaskConfigsSignatureNotRequiredV0() {
@@ -2405,6 +2483,14 @@ public class DistributedHerderTest {
ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks, 0);
}
+ private void expectRebalance(final long offset,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks,
+ String leader, String leaderUrl) {
+ expectRebalance(Collections.emptyList(), Collections.emptyList(),
+ ConnectProtocol.Assignment.NO_ERROR, offset, leader, leaderUrl, assignedConnectors, assignedTasks, 0);
+ }
+
// Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
private void expectRebalance(final Collection<String> revokedConnectors,
final List<ConnectorTaskId> revokedTasks,
@@ -2423,30 +2509,40 @@ public class DistributedHerderTest {
final List<String> assignedConnectors,
final List<ConnectorTaskId> assignedTasks,
int delay) {
+ expectRebalance(revokedConnectors, revokedTasks, error, offset, "leader", "leaderUrl", assignedConnectors, assignedTasks, delay);
+ }
+
+ // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
+ private void expectRebalance(final Collection<String> revokedConnectors,
+ final List<ConnectorTaskId> revokedTasks,
+ final short error,
+ final long offset,
+ String leader,
+ String leaderUrl,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks,
+ int delay) {
member.ensureActive();
- PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() throws Throwable {
- ExtendedAssignment assignment;
- if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) {
- rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
- }
-
- if (connectProtocolVersion == CONNECT_PROTOCOL_V0) {
- assignment = new ExtendedAssignment(
- connectProtocolVersion, error, "leader", "leaderUrl", offset,
- assignedConnectors, assignedTasks,
- Collections.emptyList(), Collections.emptyList(), 0);
- } else {
- assignment = new ExtendedAssignment(
- connectProtocolVersion, error, "leader", "leaderUrl", offset,
- new ArrayList<>(assignedConnectors), new ArrayList<>(assignedTasks),
- new ArrayList<>(revokedConnectors), new ArrayList<>(revokedTasks), delay);
- }
- rebalanceListener.onAssigned(assignment, 3);
- time.sleep(100L);
- return null;
+ PowerMock.expectLastCall().andAnswer(() -> {
+ ExtendedAssignment assignment;
+ if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) {
+ rebalanceListener.onRevoked(leader, revokedConnectors, revokedTasks);
+ }
+
+ if (connectProtocolVersion == CONNECT_PROTOCOL_V0) {
+ assignment = new ExtendedAssignment(
+ connectProtocolVersion, error, leader, leaderUrl, offset,
+ assignedConnectors, assignedTasks,
+ Collections.emptyList(), Collections.emptyList(), 0);
+ } else {
+ assignment = new ExtendedAssignment(
+ connectProtocolVersion, error, leader, leaderUrl, offset,
+ new ArrayList<>(assignedConnectors), new ArrayList<>(assignedTasks),
+ new ArrayList<>(revokedConnectors), new ArrayList<>(revokedTasks), delay);
}
+ rebalanceListener.onAssigned(assignment, 3);
+ time.sleep(100L);
+ return null;
});
if (!revokedConnectors.isEmpty()) {