You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:06:23 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

guozhangwang commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r437090376



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -125,40 +126,21 @@ public boolean isValidTransition(final State newState) {
 
     void postCommit();
 
-    /**
-     * @throws TaskMigratedException all the task has been migrated
-     * @throws StreamsException fatal error, should close the thread
-     */
-    void prepareSuspend();
+    void suspendDirty();

Review comment:
       It seems to me that the reason we want to have two suspends and also merging the suspendClean with prepareCommit is that for `StreamTask`, if state `SUSPENDED` we want to skip prepareCommit. I feel it is a tad cleaner to separate them further into one `suspend` which does not try to call prepareCommit, and rely on whether prepareCommit should do anything or not based on both `state` (i.e. only running/restoring/suspended need to commit) and `commitNeeded` flag.
   
   With that we can convert the callers as follows:
   
   1. suspendDirty(): just call suspend(), do not call prepareCommit().
   2. suspendCleanAndPrepareCommit(): 
   2.a) from `task.closeAndRecycleState`: call suspend(), and then call prepareCommit(); the second would check `commitNeeded` and if it was false, we would not try to flush / commit. Hence if the task just transited from other states to suspended, then `commitNeeded` should still be true.
   2.b) from `taskManager` directly: same as above, but for this call we always follow with a `committableOffsetsAndMetadata` getting the map of offsets, so I'm thinking we can merge `prepareCommit` with `committableOffsetsAndMetadata` as well: if the state is right and `commitNeeded` is set, execute the prepare committing procedure, and accumulate the offsets, otherwise returning `null` indicating no offsets needed to be committed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -125,40 +126,21 @@ public boolean isValidTransition(final State newState) {
 
     void postCommit();
 
-    /**

Review comment:
       I'm wondering if we could merge `committableOffsetsAndMetadata` with `prepareCommit` as well, letting the latter to return the map? See my other comment aside.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org