You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/20 00:04:31 UTC

[GitHub] [kafka] jsancio opened a new pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

jsancio opened a new pull request #11084:
URL: https://github.com/apache/kafka/pull/11084


   This commit includes a few changes:
   
   1. The leader assumes that there is always an in-memory snapshot at the
   last committed offset. This means that the controller needs to generate
   an in-memory snapshot when getting promoted from inactive to active.
   
   2. Delete all in-memory snapshots less that the last committed offset
   when the on-disk snapshot is canceled or it completes.
   
   3. The controller always starts inactive. When loading an on-disk
   snapshot the controller is always inactive. This means that we don't
   need to generate an in-memory snapshot at the offset -1 because there
   is no requirement that there exists an in-memory snapshot at the last
   committed offset when the controller is inactive.
   
   4. SnapshotRegistry's createSnapshot should allow the creating of a
   snapshot if the last snapshot's offset is the given offset. This allows
   for simpler client code.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #11084:
URL: https://github.com/apache/kafka/pull/11084


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672716678



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -302,17 +302,13 @@ private void onUpdateLeaderHighWatermark(
     }
 
     private void updateListenersProgress(long highWatermark) {
-        updateListenersProgress(listenerContexts, highWatermark);
-    }
-
-    private void updateListenersProgress(List<ListenerContext> listenerContexts, long highWatermark) {
         for (ListenerContext listenerContext : listenerContexts) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
                 if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
                     SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException(
                         String.format(
                             "Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s",
-                            listenerContext.listener.getClass().getTypeName(),
+                            listenerContext.listenerName(),

Review comment:
       Note that all of the change to the `raft` module are cosmetic mainly to improving logging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672715848



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -755,11 +766,22 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                             newEpoch + ", but we never renounced controller epoch " +
                             curEpoch);
                     }
-                    log.warn("Becoming active at controller epoch {}.", newEpoch);
+                    log.info(

Review comment:
       This can fit in two lines... let's try to avoid "exploded" function calls that look like function bodies.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -755,11 +766,22 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                             newEpoch + ", but we never renounced controller epoch " +
                             curEpoch);
                     }
-                    log.warn("Becoming active at controller epoch {}.", newEpoch);
+                    log.info(

Review comment:
       This can fit in two or three lines... let's try to avoid "exploded" function calls that look like function bodies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672716671



##########
File path: metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##########
@@ -178,14 +178,18 @@ public Snapshot getSnapshot(long epoch) {
     /**
      * Creates a new snapshot at the given epoch.
      *
+     * If {@code epoch} already exists and it is the last snapshot then just return that snapshot.

Review comment:
       With this modification, the function should be renamed something like `getOrCreateSnapshot`, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672715848



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -755,11 +766,22 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                             newEpoch + ", but we never renounced controller epoch " +
                             curEpoch);
                     }
-                    log.warn("Becoming active at controller epoch {}.", newEpoch);
+                    log.info(

Review comment:
       This can fit in two lines... let's try to avoid "exploded" function calls that look like function bodies.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -755,11 +766,22 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                             newEpoch + ", but we never renounced controller epoch " +
                             curEpoch);
                     }
-                    log.warn("Becoming active at controller epoch {}.", newEpoch);
+                    log.info(

Review comment:
       This can fit in two or three lines... let's try to avoid "exploded" function calls that look like function bodies.

##########
File path: metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##########
@@ -178,14 +178,18 @@ public Snapshot getSnapshot(long epoch) {
     /**
      * Creates a new snapshot at the given epoch.
      *
+     * If {@code epoch} already exists and it is the last snapshot then just return that snapshot.

Review comment:
       With this modification, the function should be renamed something like `getOrCreateSnapshot`, right?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -651,20 +661,21 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             // If we are writing a new snapshot, then we need to keep that around;
                             // otherwise, we should delete up to the current committed offset.
                             snapshotRegistry.deleteSnapshotsUpTo(
-                                Math.min(offset, snapshotGeneratorManager.snapshotLastOffsetFromLog()));
+                                snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset)

Review comment:
       newline not needed here

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -651,20 +661,21 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             // If we are writing a new snapshot, then we need to keep that around;
                             // otherwise, we should delete up to the current committed offset.
                             snapshotRegistry.deleteSnapshotsUpTo(
-                                Math.min(offset, snapshotGeneratorManager.snapshotLastOffsetFromLog()));
+                                snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset)

Review comment:
       extra newline not needed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672715848



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -755,11 +766,22 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                             newEpoch + ", but we never renounced controller epoch " +
                             curEpoch);
                     }
-                    log.warn("Becoming active at controller epoch {}.", newEpoch);
+                    log.info(

Review comment:
       This can fit in two lines... let's try to avoid "exploded" function calls that look like function bodies.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -755,11 +766,22 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                             newEpoch + ", but we never renounced controller epoch " +
                             curEpoch);
                     }
-                    log.warn("Becoming active at controller epoch {}.", newEpoch);
+                    log.info(

Review comment:
       This can fit in two or three lines... let's try to avoid "exploded" function calls that look like function bodies.

##########
File path: metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
##########
@@ -178,14 +178,18 @@ public Snapshot getSnapshot(long epoch) {
     /**
      * Creates a new snapshot at the given epoch.
      *
+     * If {@code epoch} already exists and it is the last snapshot then just return that snapshot.

Review comment:
       With this modification, the function should be renamed something like `getOrCreateSnapshot`, right?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -651,20 +661,21 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             // If we are writing a new snapshot, then we need to keep that around;
                             // otherwise, we should delete up to the current committed offset.
                             snapshotRegistry.deleteSnapshotsUpTo(
-                                Math.min(offset, snapshotGeneratorManager.snapshotLastOffsetFromLog()));
+                                snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset)

Review comment:
       newline not needed here

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -651,20 +661,21 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             // If we are writing a new snapshot, then we need to keep that around;
                             // otherwise, we should delete up to the current committed offset.
                             snapshotRegistry.deleteSnapshotsUpTo(
-                                Math.min(offset, snapshotGeneratorManager.snapshotLastOffsetFromLog()));
+                                snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset)

Review comment:
       extra newline not needed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672716678



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -302,17 +302,13 @@ private void onUpdateLeaderHighWatermark(
     }
 
     private void updateListenersProgress(long highWatermark) {
-        updateListenersProgress(listenerContexts, highWatermark);
-    }
-
-    private void updateListenersProgress(List<ListenerContext> listenerContexts, long highWatermark) {
         for (ListenerContext listenerContext : listenerContexts) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
                 if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
                     SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException(
                         String.format(
                             "Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s",
-                            listenerContext.listener.getClass().getTypeName(),
+                            listenerContext.listenerName(),

Review comment:
       Note that all of the change to the `raft` module are cosmetic mainly to improving logging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672716678



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -302,17 +302,13 @@ private void onUpdateLeaderHighWatermark(
     }
 
     private void updateListenersProgress(long highWatermark) {
-        updateListenersProgress(listenerContexts, highWatermark);
-    }
-
-    private void updateListenersProgress(List<ListenerContext> listenerContexts, long highWatermark) {
         for (ListenerContext listenerContext : listenerContexts) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
                 if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
                     SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException(
                         String.format(
                             "Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s",
-                            listenerContext.listener.getClass().getTypeName(),
+                            listenerContext.listenerName(),

Review comment:
       Note that all of the change to the `raft` module are cosmetic mainly to improving logging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11084: KAFKA-13100: Create a snapshot during leadership promotion

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11084:
URL: https://github.com/apache/kafka/pull/11084#discussion_r672716954



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -651,20 +661,21 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             // If we are writing a new snapshot, then we need to keep that around;
                             // otherwise, we should delete up to the current committed offset.
                             snapshotRegistry.deleteSnapshotsUpTo(
-                                Math.min(offset, snapshotGeneratorManager.snapshotLastOffsetFromLog()));
+                                snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset)

Review comment:
       newline not needed here

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -651,20 +661,21 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                             // If we are writing a new snapshot, then we need to keep that around;
                             // otherwise, we should delete up to the current committed offset.
                             snapshotRegistry.deleteSnapshotsUpTo(
-                                Math.min(offset, snapshotGeneratorManager.snapshotLastOffsetFromLog()));
+                                snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset)

Review comment:
       extra newline not needed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org