You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/04/01 19:56:47 UTC
[kafka] branch 2.5 updated: KAFKA-12474: Handle failure to write
new session keys gracefully (#10396)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new d5e85ef KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
d5e85ef is described below
commit d5e85efb95fdf0e2a0a446d5d789402799deb148
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu Apr 1 13:26:01 2021 -0400
KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end.
At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader).
Verified with new unit tests for both cases (failure to write, failure to read back after write).
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
.../runtime/distributed/DistributedHerder.java | 20 ++++--
.../runtime/distributed/DistributedHerderTest.java | 82 ++++++++++++++++++++++
2 files changed, 97 insertions(+), 5 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 6e8b6d7..bd0fbe7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -358,10 +358,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (checkForKeyRotation(now)) {
keyExpiration = Long.MAX_VALUE;
- configBackingStore.putSessionKey(new SessionKey(
- keyGenerator.generateKey(),
- now
- ));
+ try {
+ configBackingStore.putSessionKey(new SessionKey(
+ keyGenerator.generateKey(),
+ now
+ ));
+ } catch (Exception e) {
+ log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying");
+ canReadConfigs = false;
+ return;
+ }
}
// Process any external requests
@@ -1121,7 +1127,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
* @return true if successful, false if timed out
*/
private boolean readConfigToEnd(long timeoutMs) {
- log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
+ if (configState.offset() < assignment.offset()) {
+ log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
+ } else {
+ log.info("Reading to end of config log; current config state offset: {}", configState.offset());
+ }
try {
configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
configState = configBackingStore.snapshot();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 2a96850..bd93e33 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -26,11 +26,13 @@ import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
@@ -70,6 +72,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
+import javax.crypto.SecretKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -87,8 +90,10 @@ import java.util.concurrent.TimeoutException;
import static java.util.Collections.singletonList;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
@@ -2064,6 +2069,83 @@ public class DistributedHerderTest {
}
@Test
+ public void testFailedToWriteSessionKey() throws Exception {
+ // First tick -- after joining the group, we try to write a new
+ // session key to the config topic, and fail
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+ EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ configBackingStore.putSessionKey(anyObject(SessionKey.class));
+ EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
+
+ // Second tick -- we read to the end of the config topic first,
+ // then ensure we're still active in the group
+ // then try a second time to write a new session key,
+ // then finally begin polling for group activity
+ expectPostRebalanceCatchup(SNAPSHOT);
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ configBackingStore.putSessionKey(anyObject(SessionKey.class));
+ EasyMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick();
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception {
+ SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
+ EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
+ EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
+ SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds());
+ ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, sessionKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.emptySet());
+
+ // First tick -- after joining the group, we try to write a new session key to
+ // the config topic, and fail (in this case, we're trying to simulate that we've
+ // actually written the key successfully, but haven't been able to read it back
+ // from the config topic, so to the herder it looks the same as if it'd just failed
+ // to write the key)
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+ EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ configBackingStore.putSessionKey(anyObject(SessionKey.class));
+ EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
+
+ // Second tick -- we read to the end of the config topic first, and pick up
+ // the session key that we were able to write the last time,
+ // then ensure we're still active in the group
+ // then finally begin polling for group activity
+ // Importantly, we do not try to write a new session key this time around
+ configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ configUpdateListener.onSessionKeyUpdate(sessionKey);
+ return null;
+ });
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithSessionKey);
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll(secretKey);
+
+ herder.tick();
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testKeyExceptionDetection() {
assertFalse(herder.isPossibleExpiredKeyException(
time.milliseconds(),