You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/02 06:55:45 UTC

[GitHub] [kafka] mjsax opened a new pull request #8776: KAFKA-9441: Improve Kafka Streams task management

mjsax opened a new pull request #8776:
URL: https://github.com/apache/kafka/pull/8776


    - make task manager agnostic to task state
    - make tasks state transitions idempotent


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435588028



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       Sure, can add a comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433660276



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
         final List<Task> restoringTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
-            if (task.state() == CREATED) {
-                try {
-                    task.initializeIfNeeded();
-                } catch (final LockException | TimeoutException e) {
-                    // it is possible that if there are multiple threads within the instance that one thread
-                    // trying to grab the task from the other, while the other has not released the lock since
-                    // it did not participate in the rebalance. In this case we can just retry in the next iteration
-                    log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
-                    allRunning = false;
-                }
+            try {
+                task.initializeIfNeeded();
+            } catch (final LockException | TimeoutException e) {
+                // it is possible that if there are multiple threads within the instance that one thread
+                // trying to grab the task from the other, while the other has not released the lock since
+                // it did not participate in the rebalance. In this case we can just retry in the next iteration
+                log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
+                allRunning = false;
             }
 
-            if (task.state() == RESTORING) {
+            if (task.isActive()) {

Review comment:
       StandbyTask are never in `RESTORING` state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434107127



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1799,8 +1799,8 @@ public void shouldThrowIfClosingOnIllegalState() {
         task.closeClean(checkpoint);
 
         // close call are not idempotent since we are already in closed

Review comment:
       nit: call -> calls

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());

Review comment:
       We could just say `Skip closing since state is closed` here

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +542,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {
+            case CREATED:
+                // the task is created and not initialized, just re-write the checkpoint file
+                checkpoint = Collections.emptyMap();
 
-            if (clean) {
-                stateMgr.flush();
-                recordCollector.flush();
-                checkpoint = checkpointableOffsets();
-            } else {
+                break;
+
+            case RUNNING:
+                closeTopology(clean);
+
+                if (clean) {
+                    stateMgr.flush();
+                    recordCollector.flush();
+                    checkpoint = checkpointableOffsets();
+                } else {
+                    checkpoint = null; // `null` indicates to not write a checkpoint
+                    executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
+                }
+
+                break;
+
+            case RESTORING:
+                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
+                checkpoint = Collections.emptyMap();
+
+                break;
+
+            case SUSPENDED:
+                // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
                 checkpoint = null; // `null` indicates to not write a checkpoint
-                executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
-            }
-        } else if (state() == State.RESTORING) {
-            executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.SUSPENDED) {
-            // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
-            checkpoint = null; // `null` indicates to not write a checkpoint
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while prepare closing active task " + id);
+
+                break;
+            case CLOSED:

Review comment:
       Could we merge the case `CLOSED` and `CREATED`? Also could you elaborate why we do empty checkpoint map instead of null?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       Should we also check `task.isActive` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434121892



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());
+            return;
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);

Review comment:
       I'd be in favor... I can see how the choice can go both ways on if/else vs. switch, but at the end of the day, it's nice just to do the same thing everywhere. I think switch tends to be nicer when you need to enumerate most/all of the cases, which seems to be the direction we're heading here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434118800



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());
+            return;
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);

Review comment:
       I see. I guess personally I'd prefer to use a `switch` and just case all states explicitly, to make it clear when an action is a no-op for a certain state




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434217037



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {
+            case CREATED:
+                // the task is created and not initialized, just re-write the checkpoint file
+                checkpoint = Collections.emptyMap();
 
-            if (clean) {
-                stateMgr.flush();
-                recordCollector.flush();
-                checkpoint = checkpointableOffsets();
-            } else {
+                break;
+
+            case RUNNING:
+                closeTopology(clean);
+
+                if (clean) {
+                    stateMgr.flush();
+                    recordCollector.flush();
+                    checkpoint = checkpointableOffsets();
+                } else {
+                    checkpoint = null; // `null` indicates to not write a checkpoint
+                    executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
+                }
+
+                break;
+
+            case RESTORING:
+                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
+                checkpoint = Collections.emptyMap();
+
+                break;
+
+            case SUSPENDED:
+                // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
                 checkpoint = null; // `null` indicates to not write a checkpoint
-                executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
-            }
-        } else if (state() == State.RESTORING) {
-            executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.SUSPENDED) {
-            // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
-            checkpoint = null; // `null` indicates to not write a checkpoint
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while prepare closing active task " + id);
+
+                break;
+            case CLOSED:
+                checkpoint = Collections.emptyMap();

Review comment:
       Was not 100% -- could `null` not lead to a wipeout of an existing checkpoint file? Writing the checkpoint again should be idempotent?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#issuecomment-638530915


   Updated this PR. Call for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433659050



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {

Review comment:
       Align code style to use `switch` if all states are used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#issuecomment-639665412


   Merged to `trunk` and cherry-picked to `2.6`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435589707



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {
+                return Task.LATEST_OFFSET;
+            } else {
+                offsetSum += offset;

Review comment:
       Ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433662426



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -2637,11 +2646,17 @@ private File getCheckpointFile(final TaskId task) {
         public void initializeIfNeeded() {
             if (state() == State.CREATED) {
                 transitionTo(State.RESTORING);
+                if (!active) {
+                    transitionTo(State.RUNNING);
+                }
             }
         }
 
         @Override
         public void completeRestoration() {
+            if (state() == State.RUNNING) {

Review comment:
       Must be idempotent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434220229



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());
+            return;
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);

Review comment:
       I update most. Only left a few for which we only check a single state (seems overkill to use `switch()` for this case). Is that ok with you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#issuecomment-637327785


   Call for review @guozhangwang @abbccdda @vvcephei @ableegoldman 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433661972



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       If an active tasks is `RUNNING`, the offsets are set to `LATEST_OFFSET` in `task.changelogOffsets()` that is passed as parameter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434117796



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
         final List<Task> restoringTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
-            if (task.state() == CREATED) {
-                try {
-                    task.initializeIfNeeded();
-                } catch (final LockException | TimeoutException e) {
-                    // it is possible that if there are multiple threads within the instance that one thread
-                    // trying to grab the task from the other, while the other has not released the lock since
-                    // it did not participate in the rebalance. In this case we can just retry in the next iteration
-                    log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
-                    allRunning = false;
-                }
+            try {
+                task.initializeIfNeeded();
+            } catch (final LockException | TimeoutException e) {
+                // it is possible that if there are multiple threads within the instance that one thread
+                // trying to grab the task from the other, while the other has not released the lock since
+                // it did not participate in the rebalance. In this case we can just retry in the next iteration
+                log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
+                allRunning = false;
             }
 
-            if (task.state() == RESTORING) {
+            if (task.isActive()) {

Review comment:
       Ack. -- I think this need further refactoring... Will do in a follow up PR. Renaming for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434111426



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());
+            return;
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);

Review comment:
       The state could be `RESTORING` what is _illegal_ but not _unknown_ -- We would need more conditions to distinguish both cases (introducing `switch()` would be helpful for this case). Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433659277



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {
+            case CREATED:
+            case SUSPENDED:
+                // do nothing
+                log.trace("Skip prepare suspending since state is {}", state());
 
-            stateMgr.flush();
-            recordCollector.flush();
+                break;
 
-            log.info("Prepare suspending running");
-        } else if (state() == State.RESTORING) {
-            stateMgr.flush();
+            case RESTORING:
+                stateMgr.flush();
+                log.info("Prepare suspending restoring");
 
-            log.info("Prepare suspending restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+                break;
+
+            case RUNNING:
+                closeTopology(true);
+
+                stateMgr.flush();
+                recordCollector.flush();
+
+                log.info("Prepare suspending running");
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id);

Review comment:
       Minor: improve error message

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {
+            case CREATED:
+            case SUSPENDED:
+                // do nothing
+                log.trace("Skip prepare suspending since state is {}", state());
 
-            stateMgr.flush();
-            recordCollector.flush();
+                break;
 
-            log.info("Prepare suspending running");
-        } else if (state() == State.RESTORING) {
-            stateMgr.flush();
+            case RESTORING:
+                stateMgr.flush();
+                log.info("Prepare suspending restoring");
 
-            log.info("Prepare suspending restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+                break;
+
+            case RUNNING:
+                closeTopology(true);
+
+                stateMgr.flush();
+                recordCollector.flush();
+
+                log.info("Prepare suspending running");
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id);
         }
     }
 
     @Override
     public void suspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            stateMgr.checkpoint(checkpointableOffsets());
-            partitionGroup.clear();
-
-            transitionTo(State.SUSPENDED);
-            log.info("Suspended running");
-        } else if (state() == State.RESTORING) {
-            // we just checkpoint the position that we've restored up to without
-            // going through the commit process
-            stateMgr.checkpoint(emptyMap());
-
-            // we should also clear any buffered records of a task when suspending it
-            partitionGroup.clear();
-
-            transitionTo(State.SUSPENDED);
-            log.info("Suspended restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+        switch (state()) {
+            case CREATED:
+            case SUSPENDED:
+                // do nothing
+                log.trace("Skip suspending since state is {}", state());
+
+                break;
+
+            case RUNNING:
+                stateMgr.checkpoint(checkpointableOffsets());
+                partitionGroup.clear();
+
+                transitionTo(State.SUSPENDED);
+                log.info("Suspended running");
+
+                break;
+
+            case RESTORING:
+                // we just checkpoint the position that we've restored up to without
+                // going through the commit process
+                stateMgr.checkpoint(emptyMap());
+
+                // we should also clear any buffered records of a task when suspending it
+                partitionGroup.clear();
+
+                transitionTo(State.SUSPENDED);
+                log.info("Suspended restoring");
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id);

Review comment:
       Minor: improve error message




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434215917



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {

Review comment:
       Not 100% sure what the end state will be atm, but yes, we should be able to simplify this further in follow up PRs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433662342



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -2637,11 +2646,17 @@ private File getCheckpointFile(final TaskId task) {
         public void initializeIfNeeded() {
             if (state() == State.CREATED) {
                 transitionTo(State.RESTORING);
+                if (!active) {
+                    transitionTo(State.RUNNING);

Review comment:
       A "standby" must transit to `RUNNING` here (cf `StandbyTask`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433660588



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -529,11 +524,7 @@ void handleLostAll() {
         for (final TaskId id : lockedTaskDirectories) {
             final Task task = tasks.get(id);
             if (task != null) {
-                if (task.isActive() && task.state() == RUNNING) {
-                    taskOffsetSums.put(id, Task.LATEST_OFFSET);
-                } else {
-                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));
-                }
+                taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));

Review comment:
       Make TM agnostic to task state -- putting some more logic into `sumOfChangelogOffsets` to make this work -- note that `task.changelogOffsets()` set offsets to `LATEST_OFFSET` for `StreamsTasks` that are `RUNNING`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434216773



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +542,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {
+            case CREATED:
+                // the task is created and not initialized, just re-write the checkpoint file
+                checkpoint = Collections.emptyMap();
 
-            if (clean) {
-                stateMgr.flush();
-                recordCollector.flush();
-                checkpoint = checkpointableOffsets();
-            } else {
+                break;
+
+            case RUNNING:
+                closeTopology(clean);
+
+                if (clean) {
+                    stateMgr.flush();
+                    recordCollector.flush();
+                    checkpoint = checkpointableOffsets();
+                } else {
+                    checkpoint = null; // `null` indicates to not write a checkpoint
+                    executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
+                }
+
+                break;
+
+            case RESTORING:
+                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
+                checkpoint = Collections.emptyMap();
+
+                break;
+
+            case SUSPENDED:
+                // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
                 checkpoint = null; // `null` indicates to not write a checkpoint
-                executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
-            }
-        } else if (state() == State.RESTORING) {
-            executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.SUSPENDED) {
-            // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
-            checkpoint = null; // `null` indicates to not write a checkpoint
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while prepare closing active task " + id);
+
+                break;
+            case CLOSED:

Review comment:
       Not sure if we can merge CLOSE and CREATED -- but I plan to do follow up PRs to change state handling further. Hence, I would like to keep it out-of-scope for this PR.
   
   `emptyMap()` is not an empty checkpoint: the map we return is some additional data we write into the checkpoint. `null` on the other hand means to _not_ write any checkpoint but in a clean-close case we want to write a checkpoint.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434110314



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {

Review comment:
       We could... (cf. comment below)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434215315



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {

Review comment:
       I think in general it's better to use `switch` and I never changed from `switch -> if/else` -- but did not update all methods either (if we only check for a single state, it seems overkill to use `switch`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435523829



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {
+                return Task.LATEST_OFFSET;
+            } else {
+                offsetSum += offset;

Review comment:
       It might be nice to have a sanity check here that `offset` is non-negative, since that would indicate we've unexpectedly received a sentinel value. I thought we did that already, but it's obviously not here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       This seems pretty subtle, can you convert your GH explanation into a code comment?
   
   It also seems mentioning that we assume that if any changelog offset in the task is "latest", then we assume the whole task is active and therefore return "latest". Took me a minute to work that out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433659175



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {
+            case CREATED:
+            case SUSPENDED:
+                // do nothing
+                log.trace("Skip prepare suspending since state is {}", state());
 
-            stateMgr.flush();
-            recordCollector.flush();
+                break;
 
-            log.info("Prepare suspending running");
-        } else if (state() == State.RESTORING) {
-            stateMgr.flush();
+            case RESTORING:
+                stateMgr.flush();
+                log.info("Prepare suspending restoring");
 
-            log.info("Prepare suspending restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+                break;
+
+            case RUNNING:
+                closeTopology(true);
+
+                stateMgr.flush();
+                recordCollector.flush();
+
+                log.info("Prepare suspending running");
+
+                break;
+
+            case CLOSED:
+                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+
+            default:
+                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id);
         }
     }
 
     @Override
     public void suspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            stateMgr.checkpoint(checkpointableOffsets());
-            partitionGroup.clear();
-
-            transitionTo(State.SUSPENDED);
-            log.info("Suspended running");
-        } else if (state() == State.RESTORING) {
-            // we just checkpoint the position that we've restored up to without
-            // going through the commit process
-            stateMgr.checkpoint(emptyMap());
-
-            // we should also clear any buffered records of a task when suspending it
-            partitionGroup.clear();
-
-            transitionTo(State.SUSPENDED);
-            log.info("Suspended restoring");
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
+        switch (state()) {

Review comment:
       Align code style to use `switch` if all states are used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434892254



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {
+            case CREATED:
+                // the task is created and not initialized, just re-write the checkpoint file
+                checkpoint = Collections.emptyMap();
 
-            if (clean) {
-                stateMgr.flush();
-                recordCollector.flush();
-                checkpoint = checkpointableOffsets();
-            } else {
+                break;
+
+            case RUNNING:
+                closeTopology(clean);
+
+                if (clean) {
+                    stateMgr.flush();
+                    recordCollector.flush();
+                    checkpoint = checkpointableOffsets();
+                } else {
+                    checkpoint = null; // `null` indicates to not write a checkpoint
+                    executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
+                }
+
+                break;
+
+            case RESTORING:
+                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
+                checkpoint = Collections.emptyMap();
+
+                break;
+
+            case SUSPENDED:
+                // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
                 checkpoint = null; // `null` indicates to not write a checkpoint
-                executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
-            }
-        } else if (state() == State.RESTORING) {
-            executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.SUSPENDED) {
-            // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
-            checkpoint = null; // `null` indicates to not write a checkpoint
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while prepare closing active task " + id);
+
+                break;
+            case CLOSED:
+                checkpoint = Collections.emptyMap();

Review comment:
       I think null and emptyMap has different semantics: the former indicates do not try to override the checkpoint file, while the latter indicates “just writing the checkpoint file as of the current state store maintained offset” I.e. in stateMgr.checkpoint(writtenOffsets)  if the map is empty, we would still write the checkpoint file but just based on each store’s current storeMetadata.offset.
   
   So back to prepareClose: if we are in CREATED, meaning we’ve read the checkpoint file into the store, we still need to write that loaded offsets back to the file; in SUSPENDED we know we’ve written the offset to the checkpoint file already when transiting to that state, so we can return null to indicate no need to write again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434217796



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       No necessary IMHO, because `offset == Task.LATEST_OFFSET` can only be true iff the task is active. For Standbys the offset is never set to `LATEST_OFFSET` (cf. `StreamTask#changelogOffsets()` and `StandbyTask#changelogOffsets()`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #8776:
URL: https://github.com/apache/kafka/pull/8776


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434111426



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());
+            return;
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);

Review comment:
       The state could be `RESTORING` what is _illegal_ but not _unknown_ -- We would need more conditions to distinguish both cases (introducing `switch()` would be helpful for this case). Thoughts?
   
   I guess this applied to other places in the code, too. I am happy to update all. Was just hesitant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434158390



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {

Review comment:
       Is this intentional to use switch and if/else in different functions?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {

Review comment:
       Out of the scope of this PR: why we need to return the checkpoint map to bookkeep at the `task-manager`? It seems we just re-distribute it in the `close` call.
   
   I think we do not need to expose the checkpoint map in task-manager eventually?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -493,28 +549,45 @@ public void closeAndRecycleState() {
     private Map<TopicPartition, Long> prepareClose(final boolean clean) {
         final Map<TopicPartition, Long> checkpoint;
 
-        if (state() == State.CREATED) {
-            // the task is created and not initialized, just re-write the checkpoint file
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.RUNNING) {
-            closeTopology(clean);
+        switch (state()) {
+            case CREATED:
+                // the task is created and not initialized, just re-write the checkpoint file
+                checkpoint = Collections.emptyMap();
 
-            if (clean) {
-                stateMgr.flush();
-                recordCollector.flush();
-                checkpoint = checkpointableOffsets();
-            } else {
+                break;
+
+            case RUNNING:
+                closeTopology(clean);
+
+                if (clean) {
+                    stateMgr.flush();
+                    recordCollector.flush();
+                    checkpoint = checkpointableOffsets();
+                } else {
+                    checkpoint = null; // `null` indicates to not write a checkpoint
+                    executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
+                }
+
+                break;
+
+            case RESTORING:
+                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
+                checkpoint = Collections.emptyMap();
+
+                break;
+
+            case SUSPENDED:
+                // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
                 checkpoint = null; // `null` indicates to not write a checkpoint
-                executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
-            }
-        } else if (state() == State.RESTORING) {
-            executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-            checkpoint = Collections.emptyMap();
-        } else if (state() == State.SUSPENDED) {
-            // if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state
-            checkpoint = null; // `null` indicates to not write a checkpoint
-        } else {
-            throw new IllegalStateException("Illegal state " + state() + " while prepare closing active task " + id);
+
+                break;
+            case CLOSED:
+                checkpoint = Collections.emptyMap();

Review comment:
       I think we should use `null` to indicate no need to write new checkpoint?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433660588



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -529,11 +524,7 @@ void handleLostAll() {
         for (final TaskId id : lockedTaskDirectories) {
             final Task task = tasks.get(id);
             if (task != null) {
-                if (task.isActive() && task.state() == RUNNING) {
-                    taskOffsetSums.put(id, Task.LATEST_OFFSET);
-                } else {
-                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));
-                }
+                taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));

Review comment:
       Make TM agnostic to task state -- putting some more logic into `sumOfChangelogOffsets` to make this work




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#issuecomment-639154057


   > I'm mildly concerned that we're using the state to implement idempotence of processing steps, but the states aren't 1:1 with the steps. For example, when we go from RUNNING to CLOSED, we transition through both prepareClose() and close(). But there's no CLOSE_PREPARED state, so there's no way to really differentiate that specifically prepareClose() has completed before, but not close().
   
   Correct. Note that there will be more follow up PR, that will remove, eg, `prepareClose()` so this will be addressed soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435552502



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {
+                return Task.LATEST_OFFSET;
+            } else {
+                offsetSum += offset;

Review comment:
       We put the check for negative offsets in the RecordCollector, but I guess it doesn't hurt to check again here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435636306



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -247,48 +251,75 @@ public void completeRestoration() {
      */
     @Override
     public void prepareSuspend() {
-        if (state() == State.CREATED || state() == State.SUSPENDED) {
-            // do nothing
-            log.trace("Skip prepare suspending since state is {}", state());
-        } else if (state() == State.RUNNING) {
-            closeTopology(true);
+        switch (state()) {

Review comment:
       Yup, makes sense, just curious :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435555000



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       Well, this is by definition of the sentinel `LATEST_OFFSET` -- but I agree it's subtle that if any offset is "latest" then we know they all are, ie the task is active and RUNNING




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#issuecomment-639196728


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433659711



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -59,14 +58,9 @@
      *          |            |                      |         |
      *          |            |                      |         |
      *          |            v                      |         |
-     *          |      +-----+-------+              |         |
-     *          +----> | Closing (4) | <------------+         |
-     *                 +-----+-------+                        |
-     *                       |                                |
-     *                       v                                |
-     *                 +-----+-------+                        |
-     *                 | Closed (5)  | -----------------------+
-     *                 +-------------+
+     *          |      +-----+-------+ <------------+         |
+     *          +----> | Closed (4)  |                        |
+     *                 +-------------+ <----------------------+

Review comment:
       State "closing" was removed in a previous PR already; just updating the comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433659955



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
         final List<Task> restoringTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
-            if (task.state() == CREATED) {

Review comment:
       Making tasks idempotent; this check is not required any longer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434051190



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {

Review comment:
       Should we switch to `switch` here as well? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -218,6 +218,10 @@ public void initializeIfNeeded() {
      */
     @Override
     public void completeRestoration() {
+        if (state() == State.RUNNING) {
+            return;
+        }
+

Review comment:
       Can we use if/ else if here for consistency?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -466,6 +510,11 @@ public void closeAndRecycleState() {
                 stateMgr.recycle();
                 recordCollector.close();
                 break;
+
+            case CLOSED:
+                log.trace("Skip close since state is {}", state());

Review comment:
       I think this might be one of those exceptions where we should still enforce that the state is not `CLOSED` (ie throw `IllegalStateException`) since there are related actions that occur outside of the Task implementation that will fail if we try to recycle a CLOSED task. Similar to prepare/post commit, resume, etc

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
         final List<Task> restoringTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
-            if (task.state() == CREATED) {
-                try {
-                    task.initializeIfNeeded();
-                } catch (final LockException | TimeoutException e) {
-                    // it is possible that if there are multiple threads within the instance that one thread
-                    // trying to grab the task from the other, while the other has not released the lock since
-                    // it did not participate in the rebalance. In this case we can just retry in the next iteration
-                    log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
-                    allRunning = false;
-                }
+            try {
+                task.initializeIfNeeded();
+            } catch (final LockException | TimeoutException e) {
+                // it is possible that if there are multiple threads within the instance that one thread
+                // trying to grab the task from the other, while the other has not released the lock since
+                // it did not participate in the rebalance. In this case we can just retry in the next iteration
+                log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
+                allRunning = false;
             }
 
-            if (task.state() == RESTORING) {
+            if (task.isActive()) {

Review comment:
       Can we add a comment or rename `restoringTasks` to clarify that it's ok to put an active-but-not-restoring task in here since `Task#completeRestoration` is idempotent/no-op for RUNNING tasks?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -218,6 +218,10 @@ public void initializeIfNeeded() {
      */
     @Override
     public void completeRestoration() {
+        if (state() == State.RUNNING) {
+            return;
+        }
+
         if (state() == State.RESTORING) {
             initializeMetadata();
             initializeTopology();

Review comment:
       github won't let me leave a comment below this line, but can we use the `"Illegal state"`/`"Unknown state"` improvement in this method as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());
+            return;
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);

Review comment:
       `Illegal state` -> `Unknown state`? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       nice




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r435559860



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {
+                return Task.LATEST_OFFSET;
+            } else {
+                offsetSum += offset;

Review comment:
       Aha! Thanks. Yeah, I'd be in favor of coding defensively here as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org