You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/09 21:51:35 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted

guozhangwang commented on a change in pull request #9247:
URL: https://github.com/apache/kafka/pull/9247#discussion_r485941828



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) {
     public String changelogFor(final String storeName) {
         return storeToChangelogTopic.get(storeName);
     }
+
+    public void deleteCheckPointFile() throws IOException {

Review comment:
       nit: I'd suggest we just inline this function inside `StreamTask` since 1) this is only triggered with EOS enabled, and its name `deleteCheckPointFile` maybe a bit misleading, and 2) it is a very simply function anyways.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -332,6 +332,15 @@ public void resume() {
             case SUSPENDED:
                 // just transit the state without any logical changes: suspended and restoring states
                 // are not actually any different for inner modules
+
+                // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362)
+                try {
+                    stateMgr.deleteCheckPointFile();
+                    log.debug("Deleted check point file");

Review comment:
       nit: `log.debug("Deleted check point file upon resuming with EOS enabled");`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org