You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/10/18 22:09:27 UTC

[kafka] branch trunk updated: KAFKA-14300; Generate snapshot after repeated controller resign (#12747)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7dc17908de5 KAFKA-14300; Generate snapshot after repeated controller resign (#12747)
7dc17908de5 is described below

commit 7dc17908de540b461b67b71652cef652adc488b0
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Tue Oct 18 15:09:20 2022 -0700

    KAFKA-14300; Generate snapshot after repeated controller resign (#12747)
    
    Setting the `committedBytesSinceLastSnapshot` to 0 when resigning can cause the controller to not generate a snapshot after `snapshotMaxNewRecordBytes` committed bytes have been replayed.
    
    This change fixes that by simply not resetting the counter during resignation. This is correct because the counter tracks the number of committed bytes replayed but not included in the latest snapshot. In other words, reverting the last committed state does not invalidate this value.
    
    Reviewers: Colin Patrick McCabe <cm...@apache.org>
---
 .../apache/kafka/controller/QuorumController.java  | 59 +++++++++++++------
 .../apache/kafka/timeline/SnapshotRegistry.java    |  2 +-
 .../kafka/controller/QuorumControllerTest.java     | 67 ++++++++++++++++++++++
 .../org/apache/kafka/metalog/LocalLogManager.java  |  3 +
 4 files changed, 111 insertions(+), 20 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index d132a4b6be1..9aaea73082c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -980,11 +980,15 @@ public final class QuorumController implements Controller {
                                 i++;
                             }
                         }
-                        updateLastCommittedState(offset, epoch, batch.appendTimestamp());
-                        processedRecordsSize += batch.sizeInBytes();
+                        updateLastCommittedState(
+                            offset,
+                            epoch,
+                            batch.appendTimestamp(),
+                            committedBytesSinceLastSnapshot + batch.sizeInBytes()
+                        );
                     }
 
-                    maybeGenerateSnapshot(processedRecordsSize);
+                    maybeGenerateSnapshot();
                 } finally {
                     reader.close();
                 }
@@ -1039,10 +1043,10 @@ public final class QuorumController implements Controller {
                     updateLastCommittedState(
                         reader.lastContainedLogOffset(),
                         reader.lastContainedLogEpoch(),
-                        reader.lastContainedLogTimestamp()
+                        reader.lastContainedLogTimestamp(),
+                        0
                     );
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-                    newBytesSinceLastSnapshot = 0L;
                     authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
                 } finally {
                     reader.close();
@@ -1194,10 +1198,16 @@ public final class QuorumController implements Controller {
         }
     }
 
-    private void updateLastCommittedState(long offset, int epoch, long timestamp) {
+    private void updateLastCommittedState(
+        long offset,
+        int epoch,
+        long timestamp,
+        long bytesSinceLastSnapshot
+    ) {
         lastCommittedOffset = offset;
         lastCommittedEpoch = epoch;
         lastCommittedTimestamp = timestamp;
+        committedBytesSinceLastSnapshot = bytesSinceLastSnapshot;
 
         controllerMetrics.setLastCommittedRecordOffset(offset);
         if (!isActiveController()) {
@@ -1223,7 +1233,6 @@ public final class QuorumController implements Controller {
             }
             snapshotRegistry.revertToSnapshot(lastCommittedOffset);
             authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
-            newBytesSinceLastSnapshot = 0L;
             updateWriteOffset(-1);
             clusterControl.deactivate();
             cancelMaybeFenceReplicas();
@@ -1445,9 +1454,8 @@ public final class QuorumController implements Controller {
         }
     }
 
-    private void maybeGenerateSnapshot(long batchSizeInBytes) {
-        newBytesSinceLastSnapshot += batchSizeInBytes;
-        if (newBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes &&
+    private void maybeGenerateSnapshot() {
+        if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes &&
             snapshotGeneratorManager.generator == null
         ) {
             if (!isActiveController()) {
@@ -1457,11 +1465,20 @@ public final class QuorumController implements Controller {
                 snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
             }
 
-            log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
-                lastCommittedEpoch, lastCommittedOffset, newBytesSinceLastSnapshot, SnapshotReason.MaxBytesExceeded);
+            log.info(
+                "Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
+                lastCommittedEpoch,
+                lastCommittedOffset,
+                committedBytesSinceLastSnapshot,
+                SnapshotReason.MaxBytesExceeded
+            );
 
-            snapshotGeneratorManager.createSnapshotGenerator(lastCommittedOffset, lastCommittedEpoch, lastCommittedTimestamp);
-            newBytesSinceLastSnapshot = 0;
+            snapshotGeneratorManager.createSnapshotGenerator(
+                lastCommittedOffset,
+                lastCommittedEpoch,
+                lastCommittedTimestamp
+            );
+            committedBytesSinceLastSnapshot = 0;
         }
     }
 
@@ -1472,8 +1489,7 @@ public final class QuorumController implements Controller {
         snapshotGeneratorManager.cancel();
         snapshotRegistry.reset();
 
-        newBytesSinceLastSnapshot = 0;
-        updateLastCommittedState(-1, -1, -1);
+        updateLastCommittedState(-1, -1, -1, 0);
     }
 
     /**
@@ -1646,7 +1662,7 @@ public final class QuorumController implements Controller {
     /**
      * Number of bytes processed through handling commits since the last snapshot was generated.
      */
-    private long newBytesSinceLastSnapshot = 0;
+    private long committedBytesSinceLastSnapshot = 0;
 
     /**
      * How long to delay partition leader balancing operations.
@@ -2101,8 +2117,13 @@ public final class QuorumController implements Controller {
         CompletableFuture<Long> future = new CompletableFuture<>();
         appendControlEvent("beginWritingSnapshot", () -> {
             if (snapshotGeneratorManager.generator == null) {
-                log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
-                    lastCommittedEpoch, lastCommittedOffset, newBytesSinceLastSnapshot, SnapshotReason.UnknownReason);
+                log.info(
+                    "Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
+                    lastCommittedEpoch,
+                    lastCommittedOffset,
+                    committedBytesSinceLastSnapshot,
+                    SnapshotReason.UnknownReason
+                );
                 snapshotGeneratorManager.createSnapshotGenerator(
                     lastCommittedOffset,
                     lastCommittedEpoch,
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 49d62fd2117..0df9c8b8f56 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -237,7 +237,7 @@ public class SnapshotRegistry {
         } else {
             snapshot.erase();
         }
-        log.debug("Deleting snapshot {}", snapshot.epoch());
+        log.debug("Deleting in-memory snapshot {}", snapshot.epoch());
         snapshots.remove(snapshot.epoch(), snapshot);
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a5e7258a724..90807a2e2a7 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -821,6 +821,73 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testSnapshotAfterRepeatedResign() throws Throwable {
+        final int numBrokers = 4;
+        final int maxNewRecordBytes = 1000;
+        Map<Integer, Long> brokerEpochs = new HashMap<>();
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                    controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
+                }).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            for (int i = 0; i < numBrokers; i++) {
+                BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(i).
+                        setRack(null).
+                        setClusterId(active.clusterId()).
+                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
+                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
+                        setListeners(new ListenerCollection(Arrays.asList(new Listener().
+                            setName("PLAINTEXT").setHost("localhost").
+                            setPort(9092 + i)).iterator()))).get();
+                brokerEpochs.put(i, reply.epoch());
+                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
+                        setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
+                        setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
+            }
+
+            assertTrue(logEnv.appendedBytes() < maxNewRecordBytes,
+                String.format("%s appended bytes is not less than %s max new record bytes",
+                    logEnv.appendedBytes(),
+                    maxNewRecordBytes));
+
+            // Keep creating topic and resign leader until we reached the max bytes limit
+            int counter = 0;
+            while (logEnv.appendedBytes() < maxNewRecordBytes) {
+                active = controlEnv.activeController();
+
+                counter += 1;
+                String topicName = String.format("foo-%s", counter);
+                active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new CreatableTopic().setName(topicName).setNumPartitions(-1).
+                                setReplicationFactor((short) -1).
+                                setAssignments(new CreatableReplicaAssignmentCollection(
+                                    Arrays.asList(new CreatableReplicaAssignment().
+                                        setPartitionIndex(0).
+                                        setBrokerIds(Arrays.asList(0, 1, 2)),
+                                    new CreatableReplicaAssignment().
+                                        setPartitionIndex(1).
+                                        setBrokerIds(Arrays.asList(1, 2, 0))).
+                                            iterator()))).iterator())),
+                    Collections.singleton(topicName)).get(60, TimeUnit.SECONDS);
+
+                LocalLogManager activeLocalLogManager = logEnv.logManagers().get(active.nodeId());
+                activeLocalLogManager.resign(activeLocalLogManager.leaderAndEpoch().epoch());
+            }
+            logEnv.waitForLatestSnapshot();
+        }
+    }
+
     private SnapshotReader<ApiMessageAndVersion> createSnapshotReader(RawSnapshotReader reader) {
         return RecordsSnapshotReader.of(
             reader,
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index e24d86bd873..6457ede9ba2 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -406,7 +406,10 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
         }
 
         void handleLeaderChange(long offset, LeaderAndEpoch leader) {
+            // Simulate KRaft implementation by first bumping the epoch before assigning a leader
+            listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.empty(), leader.epoch()));
             listener.handleLeaderChange(leader);
+
             notifiedLeader = leader;
             this.offset = offset;
         }