You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/04/01 23:24:26 UTC

[kafka] branch 2.8 updated: KAFKA-12474: Handle failure to write new session keys gracefully (#10396)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new d91d64f  KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
d91d64f is described below

commit d91d64f2ca1e0f39ffe14b8056edcab1be837df2
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu Apr 1 13:26:01 2021 -0400

    KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
    
    If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end.
    
    At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader).
    
    Verified with new unit tests for both cases (failure to write, failure to read back after write).
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     | 20 ++++--
 .../runtime/distributed/DistributedHerderTest.java | 82 ++++++++++++++++++++++
 2 files changed, 97 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 16dfbf9..c312e05 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -363,10 +363,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         if (checkForKeyRotation(now)) {
             log.debug("Distributing new session key");
             keyExpiration = Long.MAX_VALUE;
-            configBackingStore.putSessionKey(new SessionKey(
-                keyGenerator.generateKey(),
-                now
-            ));
+            try {
+                configBackingStore.putSessionKey(new SessionKey(
+                    keyGenerator.generateKey(),
+                    now
+                ));
+            } catch (Exception e) {
+                log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying");
+                canReadConfigs = false;
+                return;
+            }
         }
 
         // Process any external requests
@@ -1173,7 +1179,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
      * @return true if successful, false if timed out
      */
     private boolean readConfigToEnd(long timeoutMs) {
-        log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
+        if (configState.offset() < assignment.offset()) {
+            log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
+        } else {
+            log.info("Reading to end of config log; current config state offset: {}", configState.offset());
+        }
         try {
             configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
             configState = configBackingStore.snapshot();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index e31a03f..65ec89c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -23,11 +23,13 @@ import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.SessionKey;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
@@ -67,6 +69,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import javax.crypto.SecretKey;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -84,8 +87,10 @@ import java.util.concurrent.TimeoutException;
 import static java.util.Collections.singletonList;
 import static javax.ws.rs.core.Response.Status.FORBIDDEN;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.newCapture;
 import static org.junit.Assert.assertEquals;
@@ -2177,6 +2182,83 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testFailedToWriteSessionKey() throws Exception {
+        // First tick -- after joining the group, we try to write a new
+        // session key to the config topic, and fail
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        configBackingStore.putSessionKey(anyObject(SessionKey.class));
+        EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
+
+        // Second tick -- we read to the end of the config topic first,
+        // then ensure we're still active in the group
+        // then try a second time to write a new session key,
+        // then finally begin polling for group activity
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        configBackingStore.putSessionKey(anyObject(SessionKey.class));
+        EasyMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception {
+        SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
+        EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
+        EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
+        SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds());
+        ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, sessionKey, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.emptySet());
+
+        // First tick -- after joining the group, we try to write a new session key to
+        // the config topic, and fail (in this case, we're trying to simulate that we've
+        // actually written the key successfully, but haven't been able to read it back
+        // from the config topic, so to the herder it looks the same as if it'd just failed
+        // to write the key)
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        configBackingStore.putSessionKey(anyObject(SessionKey.class));
+        EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
+
+        // Second tick -- we read to the end of the config topic first, and pick up
+        // the session key that we were able to write the last time,
+        // then ensure we're still active in the group
+        // then finally begin polling for group activity
+        // Importantly, we do not try to write a new session key this time around
+        configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            configUpdateListener.onSessionKeyUpdate(sessionKey);
+            return null;
+        });
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithSessionKey);
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll(secretKey);
+
+        herder.tick();
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testKeyExceptionDetection() {
         assertFalse(herder.isPossibleExpiredKeyException(
             time.milliseconds(),