You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/02 20:48:21 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

guozhangwang commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434158390



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {

Review comment:
       Is this intentional to use switch and if/else in different functions?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {

Review comment:
       Out of the scope of this PR: why we need to return the checkpoint map to bookkeep at the `task-manager`? It seems we just re-distribute it in the `close` call.
   
   I think we do not need to expose the checkpoint map in task-manager eventually?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {
+            case CREATED:
+                // the task is created and not initialized, just re-write the checkpoint file
+                checkpoint = Collections.emptyMap();
 
-            if (clean) {
-                stateMgr.flush();
-                recordCollector.flush();
-                checkpoint = checkpointableOffsets();
-            } else {
+                break;
+
+            case RUNNING:
+                closeTopology(clean);
+
+                if (clean) {
+                    stateMgr.flush();
+                    recordCollector.flush();
+                    checkpoint = checkpointableOffsets();
+                } else {
+                    checkpoint = null; // `null` indicates to not write a checkpoint
+                    executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
+                }
+
+                break;
+
+            case RESTORING:
+                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
+                checkpoint = Collections.emptyMap();
+
+                break;
+
+            case SUSPENDED:
+                // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
                 checkpoint = null; // `null` indicates to not write a checkpoint
-                executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
-            }
-        } else if (state() == State.RESTORING) {
-            executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.SUSPENDED) {
-            // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
-            checkpoint = null; // `null` indicates to not write a checkpoint
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while prepare closing active task " + id);
+
+                break;
+            case CLOSED:
+                checkpoint = Collections.emptyMap();

Review comment:
       I think we should use `null` to indicate no need to write new checkpoint?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org