You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/06 00:34:14 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #8820: KAFKA-10097: Internalize checkpoint data

mjsax commented on a change in pull request #8820:
URL: https://github.com/apache/kafka/pull/8820#discussion_r436215899



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -562,32 +561,41 @@ public void closeAndRecycleState() {
                 if (clean) {
                     stateMgr.flush();
                     recordCollector.flush();
-                    checkpoint = checkpointableOffsets();
+
+                    scheduleCheckpoint(checkpointableOffsets());
                 } else {
-                    checkpoint = null; // `null` indicates to not write a checkpoint
                     executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
                 }
 
                 break;
 
             case RESTORING:
                 executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-                checkpoint = Collections.emptyMap();
+                scheduleCheckpoint(emptyMap());
 
                 break;
 
             case SUSPENDED:
             case CLOSED:
                 // not need to checkpoint, since when suspending we've already committed the state
-                checkpoint = null; // `null` indicates to not write a checkpoint
-
                 break;
 
             default:
                 throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id);
         }
+    }
 
-        return checkpoint;
+    private void scheduleCheckpoint(final Map<TopicPartition, Long> checkpoint) {
+        this.checkpoint = checkpoint;
+        this.checkpointNeeded = true;
+    }
+
+    private void writeCheckpoint() {

Review comment:
       Maybe easier to make this `writeCheckpointIfNeeded`:
   ```
       private void writeCheckpointIfNeeded() {
           if (checkpointNeeded) {
               stateMgr.checkpoint(checkpoint);
               checkpointNeeded = false;
           }
       }
   ```
   
   For this case, the caller does not need to check the `checkpointNeeded` flag?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org