You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:06:01 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

mjsax commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r436864561



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -108,13 +107,20 @@ public void completeRestoration() {
     }
 
     @Override
-    public void prepareSuspend() {
-        log.trace("No-op prepareSuspend with state {}", state());
+    public void suspendDirty() {
+        log.trace("No-op suspend dirty with state {}", state());
+        if (state() == State.RUNNING) {
+            transitionTo(State.SUSPENDED);
+        }
     }
 
     @Override
-    public void suspend() {
-        log.trace("No-op suspend with state {}", state());
+    public void suspendCleanAndPrepareCommit() {
+        log.trace("No-op suspend clean with state {}", state());
+        if (state() == State.RUNNING) {
+            transitionTo(State.SUSPENDED);
+        }
+        prepareCommit();

Review comment:
       When we suspend, we _always_ want to commit.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -151,69 +157,23 @@ public void postCommit() {
         }
     }
 
-    @Override
-    public void prepareCloseClean() {
-        prepareClose(true);
-
-        log.info("Prepared clean close");
-    }
-
-    @Override
-    public void prepareCloseDirty() {
-        prepareClose(false);
-
-        log.info("Prepared dirty close");
-    }
-
-    /**
-     * 1. commit if we are running and clean close;
-     * 2. close the state manager.
-     *
-     * @throws TaskMigratedException all the task has been migrated
-     * @throws StreamsException fatal error, should close the thread
-     */
-    private void prepareClose(final boolean clean) {

Review comment:
       This logic is now followed in `suspendCleanAndPrepareCommit()` that must be called before a task can be closed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -246,82 +245,39 @@ public void completeRestoration() {
         }
     }
 
-    /**
-     * <pre>
-     * the following order must be followed:
-     *  1. first close topology to make sure all cached records in the topology are processed
-     *  2. then flush the state, send any left changelog records
-     *  3. then flush the record collector
-     *  4. then commit the record collector -- for EOS this is the synchronization barrier
-     *  5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
-     * </pre>
-     *
-     * @throws TaskMigratedException if committing offsets failed (non-EOS)
-     *                               or if the task producer got fenced (EOS)
-     */
     @Override
-    public void prepareSuspend() {

Review comment:
       This is now done via `suspendAndPrepareCommit()`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -246,82 +245,39 @@ public void completeRestoration() {
         }
     }
 
-    /**
-     * <pre>
-     * the following order must be followed:
-     *  1. first close topology to make sure all cached records in the topology are processed
-     *  2. then flush the state, send any left changelog records
-     *  3. then flush the record collector
-     *  4. then commit the record collector -- for EOS this is the synchronization barrier
-     *  5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
-     * </pre>
-     *
-     * @throws TaskMigratedException if committing offsets failed (non-EOS)
-     *                               or if the task producer got fenced (EOS)
-     */
     @Override
-    public void prepareSuspend() {
-        switch (state()) {
-            case CREATED:
-            case SUSPENDED:
-                // do nothing
-                log.trace("Skip prepare suspending since state is {}", state());
-
-                break;
-
-            case RESTORING:
-                stateMgr.flush();
-                log.info("Prepare suspending restoring");
-
-                break;
-
-            case RUNNING:
-                closeTopology(true);
-
-                stateMgr.flush();
-                recordCollector.flush();
-
-                log.info("Prepare suspending running");
-
-                break;
-
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id);
-        }
+    public void suspendDirty() {
+        log.info("Suspending dirty");
+        suspend(false);
     }
 
     @Override
-    public void suspend() {
+    public void suspendCleanAndPrepareCommit() {
+        log.info("Suspending clean");
+        suspend(true);
+    }
+
+    @SuppressWarnings("fallthrough")
+    private void suspend(final boolean clean) {

Review comment:
       The "old" suspend() was called _after_ committing, the "new" suspend() is now called _before_ committing!
   
   The old suspend logic is now handled via `postCommit` and `close`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -369,6 +325,8 @@ public void prepareCommit() {
         switch (state()) {
             case RUNNING:
             case RESTORING:
+            case SUSPENDED:
+                maybeScheduleCheckpoint();

Review comment:
       Instead of "blindly" writing a checkpoint in `postCommit()`, we only do it if a checkpoint get's scheduled.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -423,39 +377,51 @@ public void postCommit() {
 
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
-        if (state() == State.CLOSED) {

Review comment:
       Just a rewrite to use `switch` now.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -423,39 +377,51 @@ public void postCommit() {
 
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
-        if (state() == State.CLOSED) {

Review comment:
       Mainly rewrite to use `switch` now -- however, we return a proper non-empty checkpoint on `SUSPEND` now.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -108,13 +107,20 @@ public void completeRestoration() {
     }
 
     @Override
-    public void prepareSuspend() {
-        log.trace("No-op prepareSuspend with state {}", state());
+    public void suspendDirty() {
+        log.trace("No-op suspend dirty with state {}", state());
+        if (state() == State.RUNNING) {

Review comment:
       My proposal is, to keep the methods idempotent.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -260,8 +220,8 @@ private void close(final boolean clean) {
                 log.trace("Skip closing since state is {}", state());
                 return;
 
-            case RESTORING:
-            case SUSPENDED:
+            case RESTORING: // a StandbyTask in never in RESTORING state

Review comment:
       We can, but does it help much?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -246,82 +245,39 @@ public void completeRestoration() {
         }
     }
 
-    /**
-     * <pre>
-     * the following order must be followed:
-     *  1. first close topology to make sure all cached records in the topology are processed
-     *  2. then flush the state, send any left changelog records
-     *  3. then flush the record collector
-     *  4. then commit the record collector -- for EOS this is the synchronization barrier
-     *  5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed
-     * </pre>
-     *
-     * @throws TaskMigratedException if committing offsets failed (non-EOS)
-     *                               or if the task producer got fenced (EOS)
-     */
     @Override
-    public void prepareSuspend() {
-        switch (state()) {
-            case CREATED:
-            case SUSPENDED:
-                // do nothing
-                log.trace("Skip prepare suspending since state is {}", state());
-
-                break;
-
-            case RESTORING:
-                stateMgr.flush();
-                log.info("Prepare suspending restoring");
-
-                break;
-
-            case RUNNING:
-                closeTopology(true);
-
-                stateMgr.flush();
-                recordCollector.flush();
-
-                log.info("Prepare suspending running");
-
-                break;
-
-            case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
-
-            default:
-                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id);
-        }
+    public void suspendDirty() {
+        log.info("Suspending dirty");
+        suspend(false);
     }
 
     @Override
-    public void suspend() {
+    public void suspendCleanAndPrepareCommit() {

Review comment:
       The downside is, that the caller must make sure to call both methods. (In 3 places in the code atm) -- We can also decouple them later if required. -- Don't have a very strong opinion about it, the idea was just to make it simpler for the caller.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -389,30 +346,27 @@ public void prepareCommit() {
     @Override
     public void postCommit() {
         switch (state()) {
+            case RESTORING:
             case RUNNING:
-                commitNeeded = false;
+            case SUSPENDED:
                 commitRequested = false;
+                commitNeeded = false;
 
-                if (!eosEnabled) {
-                    stateMgr.checkpoint(checkpointableOffsets());
+                if (state() == State.RESTORING) {

Review comment:
       We could. I thought it's easier to read right now compared to `if (state() == State.RESTORING || !eosEnabled)` -- I don't care too much.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -389,30 +346,27 @@ public void prepareCommit() {
     @Override
     public void postCommit() {
         switch (state()) {
+            case RESTORING:
             case RUNNING:
-                commitNeeded = false;
+            case SUSPENDED:
                 commitRequested = false;
+                commitNeeded = false;
 
-                if (!eosEnabled) {
-                    stateMgr.checkpoint(checkpointableOffsets());
+                if (state() == State.RESTORING) {
+                    writeCheckpointIfNeed();
+                } else if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos

Review comment:
       Hmmm... For `SUSPEND`, we should actually not write a checkpoint from my understanding? Will update the PR accordingly.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -389,30 +346,27 @@ public void prepareCommit() {
     @Override
     public void postCommit() {
         switch (state()) {
+            case RESTORING:
             case RUNNING:
-                commitNeeded = false;
+            case SUSPENDED:
                 commitRequested = false;
+                commitNeeded = false;
 
-                if (!eosEnabled) {
-                    stateMgr.checkpoint(checkpointableOffsets());
+                if (state() == State.RESTORING) {
+                    writeCheckpointIfNeed();
+                } else if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos
+                    writeCheckpointIfNeed();
                 }
 
-                log.debug("Committed");
-
-                break;
-
-            case RESTORING:
-                commitNeeded = false;
-                commitRequested = false;
-
-                stateMgr.checkpoint(checkpointableOffsets());
+                if (state() == State.SUSPENDED) {
+                    partitionGroup.clear();

Review comment:
       Good call.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -506,19 +455,18 @@ public void update(final Set<TopicPartition> topicPartitions, final ProcessorTop
 
     @Override
     public void closeAndRecycleState() {
-        prepareClose(true);
-
+        suspendCleanAndPrepareCommit();
         writeCheckpointIfNeed();
 
         switch (state()) {
             case CREATED:
-            case RUNNING:
-            case RESTORING:
             case SUSPENDED:
                 stateMgr.recycle();
                 recordCollector.close();
                 break;
 
+            case RESTORING: // we should have transitioned to `SUSPENDED` already

Review comment:
       What does it improve?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -534,62 +482,38 @@ public void closeAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
-    /**
-     * <pre>
-     * the following order must be followed:
-     *  1. first close topology to make sure all cached records in the topology are processed
-     *  2. then flush the state, send any left changelog records
-     *  3. then flush the record collector
-     * </pre>
-     *
-     * @param clean    shut down cleanly (ie, incl. flush) if {@code true} --
-     *                 otherwise, just close open resources
-     * @throws TaskMigratedException if the task producer got fenced (EOS)
-     */
-    private void prepareClose(final boolean clean) {
-        // Reset any previously scheduled checkpoint.
-        checkpoint = null;
-
+    private void maybeScheduleCheckpoint() {
         switch (state()) {
-            case CREATED:
-                // the task is created and not initialized, just re-write the checkpoint file
-                scheduleCheckpoint(emptyMap());
+            case RESTORING:
+                this.checkpoint = checkpointableOffsets();
+
                 break;
 
             case RUNNING:
-                closeTopology(clean);
-
-                if (clean) {
-                    stateMgr.flush();
-                    recordCollector.flush();
-                    scheduleCheckpoint(checkpointableOffsets());
-                } else {
-                    executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
+                if (!eosEnabled) {
+                    this.checkpoint = checkpointableOffsets();
                 }
 
                 break;
 
-            case RESTORING:
-                executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
-                scheduleCheckpoint(emptyMap());
+            case SUSPENDED:
+                this.checkpoint = Collections.emptyMap();
 
                 break;
 
-            case SUSPENDED:
+            case CREATED:
             case CLOSED:
-                // not need to checkpoint, since when suspending we've already committed the state
-                break;
+                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
 
             default:
-                throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id);
+                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
         }
     }
 
-    private void scheduleCheckpoint(final Map<TopicPartition, Long> checkpoint) {
-        this.checkpoint = checkpoint;
-    }
-
     private void writeCheckpointIfNeed() {
+        if (commitNeeded) {
+            throw new IllegalStateException("A checkpoint should only be written if now commit is needed.");

Review comment:
       Should be `if no commit is needed` (ie, if we need to commit, we need to commit first! -- writing a checkpoint as long as a commit is needed implies we write the checkpoint too early).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -891,6 +821,9 @@ private void initializeTopology() {
     }
 
     private void closeTopology(final boolean clean) {
+        if (state() != State.RUNNING) {

Review comment:
       Because the topology is only initialized when restoring is finished.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -389,30 +346,27 @@ public void prepareCommit() {
     @Override
     public void postCommit() {
         switch (state()) {
+            case RESTORING:
             case RUNNING:
-                commitNeeded = false;
+            case SUSPENDED:
                 commitRequested = false;
+                commitNeeded = false;
 
-                if (!eosEnabled) {
-                    stateMgr.checkpoint(checkpointableOffsets());
+                if (state() == State.RESTORING) {
+                    writeCheckpointIfNeed();
+                } else if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos

Review comment:
       For SUSPENDING, we should always write the checkpoint. Fixing.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -125,40 +126,21 @@ public boolean isValidTransition(final State newState) {
 
     void postCommit();
 
-    /**

Review comment:
       That is certainly possible. Good catch!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -125,40 +126,21 @@ public boolean isValidTransition(final State newState) {
 
     void postCommit();
 
-    /**
-     * @throws TaskMigratedException all the task has been migrated
-     * @throws StreamsException fatal error, should close the thread
-     */
-    void prepareSuspend();
+    void suspendDirty();

Review comment:
       We also want to swallow exceptions in `closeTopology()` but we can work around this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org