You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/09 20:52:20 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #9720: KAFKA-10555: Improve client state machine

wcarlson5 opened a new pull request #9720:
URL: https://github.com/apache/kafka/pull/9720


   Removes the transition to error for when there are no threads and makes Error terminal
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r540486098



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1142,11 +1128,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {

Review comment:
       We want to make `close()` idempotent and not throw an exception but we will log a warning, but only for close so that is why these logs are not in the `setState()` method.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -594,9 +581,8 @@ public synchronized void onChange(final Thread thread,
                     if (newState == GlobalStreamThread.State.RUNNING) {
                         maybeSetRunning();
                     } else if (newState == GlobalStreamThread.State.DEAD) {
-                        if (setState(State.ERROR)) {

Review comment:
       we will be doing the same thing but closing the client for now. Maybe a replace globalThread will be added later

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -411,7 +405,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
                 errorInjectedClient1.set(false);
                 stateTransitions1.clear();
                 streams1Alpha.close();
-                waitForStateTransition(stateTransitions1, CLOSE_CRASHED);
+                final KafkaStreams finalStreams1Alpha = streams1Alpha;
+                waitForCondition(() -> finalStreams1Alpha.state() == State.ERROR, "Stream did not go to ERROR");

Review comment:
       Just wait for ERROR because you don't close crashed

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
     }
 
     @Test
-    @SuppressWarnings("deprecation")

Review comment:
       remove deprecation

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -532,22 +537,6 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState
             this.threadStatesLock = new Object();
         }
 
-        /**
-         * If all threads are dead set to ERROR
-         */
-        private void maybeSetError() {

Review comment:
       This will not be needed with the new error definition

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -546,7 +546,7 @@ public void run() {
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException      if the store's change log does not contain the partition
      */
-    void runLoop() {
+    boolean runLoop() {

Review comment:
       This will let Streams shutdown uncleanly when in EOS mode

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -418,70 +418,6 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In
         Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
     }
 
-    @Test

Review comment:
       This test is for the functionality we are removing

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -524,9 +460,6 @@ public void testStateThreadClose() throws Exception {
                     "Thread never stopped.");
                 streams.threads.get(i).join();
             }
-            TestUtils.waitForCondition(

Review comment:
       There is no transition to Error here anymore

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated

Review comment:
       remove deprecated handler

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -139,7 +139,7 @@ public void shouldShutdownClient() throws InterruptedException {
             StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
             produceMessages(0L, inputTopic, "A");
-            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);

Review comment:
       When the client is shutdown it now goes to ERROR

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -625,8 +561,7 @@ public void shouldNotAddThreadWhenError() {
         final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
         final int oldSize = streams.threads.size();
         streams.start();
-        streamThreadOne.shutdown();
-        streamThreadTwo.shutdown();
+        globalStreamThread.shutdown();

Review comment:
       removing stream threads does not put the client in error anymore. The global does

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       There was a problem with the cleaner thread sometimes wiping out old segments because they were expired

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);
-
                 commitErrorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2Alpha.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);

Review comment:
       No longer do you need to close crashed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562200891



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -978,7 +971,7 @@ private void waitForRunning(final List<KeyValue<KafkaStreams.State, KafkaStreams
         waitForCondition(
             () -> !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING),
             MAX_WAIT_TIME_MS,
-            () -> "Client did not startup on time. Observers transitions: " + observed
+            () -> "Client did not have the expected state transition on time. Observers transitions: " + observed

Review comment:
       Why this change? We do wait for `RUNNING`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -993,6 +986,17 @@ private void waitForStateTransition(final List<KeyValue<KafkaStreams.State, Kafk
         );
     }
 
+    private void waitForStateTransitionContains(final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed,
+                                        final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected)
+            throws Exception {
+
+        waitForCondition(
+            () -> observed.containsAll(expected),
+            MAX_WAIT_TIME_MS,
+            () -> "Client did not have the expected state transition on time. Observers transitions: " + observed

Review comment:
       Can we add the expected transitions, too? Easier to debug if the test fails.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       The PR you liked seems to be unrelated to this test.
   
   Still wondering if we should extract this change to a dedicated PR and cherry-pick to older branches? -- Or do we have a good explanation why older branches would not be affected?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -993,6 +986,17 @@ private void waitForStateTransition(final List<KeyValue<KafkaStreams.State, Kafk
         );
     }
 
+    private void waitForStateTransitionContains(final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed,
+                                        final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected)

Review comment:
       nit: fix indention




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r556195903



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -201,32 +201,33 @@
      *         |              |                  |
      *         |              v                  |
      *         |       +------+-------+     +----+-------+
-     *         +-----&gt; | Pending      |&lt;--- | Error (5)  |
-     *                 | Shutdown (3) |     +------------+
-     *                 +------+-------+
-     *                        |
-     *                        v
-     *                 +------+-------+
-     *                 | Not          |
-     *                 | Running (4)  |
+     *         +-----&gt; | Pending      |     | Pending    |
+     *                 | Shutdown (3) |     | Error (5)  |
+     *                 +------+-------+     +-----+------+
+     *                        |                   |
+     *                        v                   v
+     *                 +------+-------+     +-----+--------+
+     *                 | Not          |     | Error (6)    |
+     *                 | Running (4)  |     +--------------+
      *                 +--------------+
      *
      *
      * </pre>
      * Note the following:
      * - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED or PARTITIONS_ASSIGNED state
      * - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
-     * - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called)
+     * - Any state except NOT_RUNNING, PENDING_ERROR or ERROR can go to PENDING_SHUTDOWN (whenever close is called)
      * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
-     *   the instance will be in the ERROR state. The user will need to close it.
+     *   the instance will be in the ERROR state. The user will not need to close it.

Review comment:
       Thanks for clarifying!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#issuecomment-764986776


   I extracted the https://github.com/apache/kafka/pull/9720/files#r562203226 fix to https://github.com/apache/kafka/pull/9948


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r561734967



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped.");
+            return false;

Review comment:
       As far as I can see, where the return value is used the javadoc says
   
   ```
   true if all threads were successfully stopped, false if the timeout was reached.
   ```
   
   Since all threads were successfully stopped, I would return `true`. We clearly document that `ERROR` is a terminal state, so I do not see why somebody should wait for `NOT_RUNNING` when the client is in `ERROR` and `close()` returns `true`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r556196884



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       I don't understand why other branches would not be affected, or to what extend this PR would expose a new issue? Can you elaborate? -- If this PR introduces an issue that requires this change, I agree that we don't need to port it to other branches -- but I don't see the connection. What do I miss?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547563654



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -524,9 +460,6 @@ public void testStateThreadClose() throws Exception {
                     "Thread never stopped.");
                 streams.threads.get(i).join();
             }
-            TestUtils.waitForCondition(

Review comment:
       Should we add a check that the thread count went down to zero? We could use the corresponding metrics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547564580



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -388,7 +382,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
                 errorInjectedClient1.set(false);
                 stateTransitions1.clear();
                 streams1Alpha.close();
-                waitForStateTransition(stateTransitions1, CLOSE_CRASHED);

Review comment:
       Why not just change this from `CLOSE_CRASHED` to `CRASH` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547566663



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -737,7 +729,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
                 errorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2AlphaTwo.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);
+                waitForStateTransition(stateTransitions2, CLOSE);

Review comment:
       As above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#issuecomment-767090922


   @mjsax it looks like there is one unrelated failure `java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'primary.test-topic-2' already exists.
   `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#issuecomment-764959796


   Btw: to what extend do we need to update the docs? We should at least add a section to `streams/upgrade_guide.html` to mention the change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9720:
URL: https://github.com/apache/kafka/pull/9720


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547567518



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -128,8 +128,8 @@ public void shouldShutdownThreadUsingOldHandler() throws InterruptedException {
             TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time");
             // should call the UncaughtExceptionHandler after rebalancing to another thread
             TestUtils.waitForCondition(() -> counter.get() == 2, DEFAULT_DURATION.toMillis(), "Handler was called 2nd time");
-            // the stream should now turn into ERROR state after 2 threads are dead
-            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
+            // there is no threads running but the client is still in running
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       Should we add a metric check to verify that the thread count is zero?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547565615



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -521,12 +516,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);

Review comment:
       Why do we remove this? If the fail, we should transit to PENDING_ERROR state first, and only afterwards move forward with the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547566495



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);
-
                 commitErrorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2Alpha.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);

Review comment:
       Why this change? client2 should transit from `PENDING_ERROR` to `ERROR` (not from `PENDING_SHUTDOWN` to `CLOSED`, and thus the new `CLOSE` check seem not to be right? Or do I miss something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r553023092



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
     }
 
     @Test
-    @SuppressWarnings("deprecation")

Review comment:
       Ah. Thanks for clarifying.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562043444



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped.");
+            return false;

Review comment:
       Alright I adjusted the close response to align with this.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -978,7 +971,7 @@ private void waitForRunning(final List<KeyValue<KafkaStreams.State, KafkaStreams
         waitForCondition(
             () -> !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING),
             MAX_WAIT_TIME_MS,
-            () -> "Client did not startup on time. Observers transitions: " + observed
+            () -> "Client did not have the expected state transition on time. Observers transitions: " + observed

Review comment:
       We do wait for running, I was thinking of bringing it to match with the other methods below but that doesn't make it anymore useful so I will revet it.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -993,6 +986,17 @@ private void waitForStateTransition(final List<KeyValue<KafkaStreams.State, Kafk
         );
     }
 
+    private void waitForStateTransitionContains(final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed,
+                                        final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected)
+            throws Exception {
+
+        waitForCondition(
+            () -> observed.containsAll(expected),
+            MAX_WAIT_TIME_MS,
+            () -> "Client did not have the expected state transition on time. Observers transitions: " + observed

Review comment:
       sure that is fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547565615



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -521,12 +516,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);

Review comment:
       Why do we remove this? If the fail, we should transit to ERROR state first, and only afterwards move forward with the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#issuecomment-767090922


   @mjsax it looks like there is one unrelated failure `java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'primary.test-topic-2' already exists.
   `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r553025964



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -201,32 +201,33 @@
      *         |              |                  |
      *         |              v                  |
      *         |       +------+-------+     +----+-------+
-     *         +-----&gt; | Pending      |&lt;--- | Error (5)  |
-     *                 | Shutdown (3) |     +------------+
-     *                 +------+-------+
-     *                        |
-     *                        v
-     *                 +------+-------+
-     *                 | Not          |
-     *                 | Running (4)  |
+     *         +-----&gt; | Pending      |     | Pending    |
+     *                 | Shutdown (3) |     | Error (5)  |
+     *                 +------+-------+     +-----+------+
+     *                        |                   |
+     *                        v                   v
+     *                 +------+-------+     +-----+--------+
+     *                 | Not          |     | Error (6)    |
+     *                 | Running (4)  |     +--------------+
      *                 +--------------+
      *
      *
      * </pre>
      * Note the following:
      * - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED or PARTITIONS_ASSIGNED state
      * - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
-     * - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called)
+     * - Any state except NOT_RUNNING, PENDING_ERROR or ERROR can go to PENDING_SHUTDOWN (whenever close is called)
      * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
-     *   the instance will be in the ERROR state. The user will need to close it.
+     *   the instance will be in the ERROR state. The user will not need to close it.

Review comment:
       I thought we always need to call close? If an error happens, we call the handler, and if the handler return shutdown, we transit to `PENDING_ERROR`. On `close()` we transit from `PENDING_ERROR -> ERROR`?
   
   Or do I have some misconception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r553025516



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);
-
                 commitErrorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2Alpha.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);

Review comment:
       From my understanding/ according to the new state transition diagram, if there is an error we go to `PENDING_ERROR` and if we call `close()` we transit `PENDING_ERROR -> ERROR`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547556180



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped.");
+            return false;

Review comment:
       Should we return `true` there? I understand that we are not in `NOT_RUNNING` state, but in the end we are in a terminal state and we did cleanup all resources. -- I guess, I am raising the question if we should relax the definition of this return value?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547548275



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -226,7 +226,8 @@
         RUNNING(1, 2, 3, 5),    // 2
         PENDING_SHUTDOWN(4),    // 3
         NOT_RUNNING,            // 4
-        ERROR(3);               // 5
+        PENDING_ERROR(6),       // 5
+        ERROR;               // 6

Review comment:
       super nit: fix indention of comment
   
   Also please update the state diagram in the comment above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547564223



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
     }
 
     @Test
-    @SuppressWarnings("deprecation")

Review comment:
       Why did we even set the exception handler that does not do anything? 🤔 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547564223



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
     }
 
     @Test
-    @SuppressWarnings("deprecation")

Review comment:
       Why did we even the the exception handler that does not do anything? 🤔 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547548275



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -226,7 +226,8 @@
         RUNNING(1, 2, 3, 5),    // 2
         PENDING_SHUTDOWN(4),    // 3
         NOT_RUNNING,            // 4
-        ERROR(3);               // 5
+        PENDING_ERROR(6),       // 5
+        ERROR;               // 6

Review comment:
       super nit: fix indention of comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r560413676



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       It looks like this issue is fixed by another PR. https://github.com/apache/kafka/pull/9733#issuecomment-761242084 So I am closing this now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#issuecomment-764986776


   I extracted the https://github.com/apache/kafka/pull/9720/files#r562203226 fix to https://github.com/apache/kafka/pull/9948


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r553022798



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -557,12 +490,12 @@ public void testStateGlobalThreadClose() throws Exception {
                 () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
                 "Thread never stopped.");
             globalStreamThread.join();
-            assertEquals(streams.state(), KafkaStreams.State.ERROR);
+            assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR);
         } finally {
             streams.close();
         }
 
-        assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+        assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR);

Review comment:
       I guess it would not hurt. Leave it up to you than.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547563917



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -557,12 +490,12 @@ public void testStateGlobalThreadClose() throws Exception {
                 () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
                 "Thread never stopped.");
             globalStreamThread.join();
-            assertEquals(streams.state(), KafkaStreams.State.ERROR);
+            assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR);
         } finally {
             streams.close();
         }
 
-        assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+        assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR);

Review comment:
       If the global thread dies, should we not transit to `ERROR` at the end?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r553613699



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);
-
                 commitErrorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2Alpha.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);

Review comment:
       the call to `close()` will have no effect. The `PENDING_ERROR -> ERROR` transition is like the `PENDING_SHUTDOWN -> NOT_RUNNING ` where the transition from the `PENDING...` state is automatic

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       Maybe we will. It doesn't seems to be a problem for the other branches but something with how the test changes in this PR exposed it. This happened in the handler int tests too. I think as long as the other tests don't change it won't be a problem and if those tests change we can fix it then.
   
   But if you think it should be fixed in the other branches anyways I'll trust your judgement.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -201,32 +201,33 @@
      *         |              |                  |
      *         |              v                  |
      *         |       +------+-------+     +----+-------+
-     *         +-----&gt; | Pending      |&lt;--- | Error (5)  |
-     *                 | Shutdown (3) |     +------------+
-     *                 +------+-------+
-     *                        |
-     *                        v
-     *                 +------+-------+
-     *                 | Not          |
-     *                 | Running (4)  |
+     *         +-----&gt; | Pending      |     | Pending    |
+     *                 | Shutdown (3) |     | Error (5)  |
+     *                 +------+-------+     +-----+------+
+     *                        |                   |
+     *                        v                   v
+     *                 +------+-------+     +-----+--------+
+     *                 | Not          |     | Error (6)    |
+     *                 | Running (4)  |     +--------------+
      *                 +--------------+
      *
      *
      * </pre>
      * Note the following:
      * - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED or PARTITIONS_ASSIGNED state
      * - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
-     * - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called)
+     * - Any state except NOT_RUNNING, PENDING_ERROR or ERROR can go to PENDING_SHUTDOWN (whenever close is called)
      * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
-     *   the instance will be in the ERROR state. The user will need to close it.
+     *   the instance will be in the ERROR state. The user will not need to close it.

Review comment:
       the handler will call close, but the user will not need to. The `PENDING_ERROR` state is indicating the resources are closing before the transition to `ERROR` after which no more work will be done. We made it so the user can call close on `PENDING_ERROR` or `ERROR` but it will only log a warning




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r561734967



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped.");
+            return false;

Review comment:
       As far as I can see, where the return value is used the javadoc says
   
   ```
   true if all threads were successfully stopped, false if the timeout was reached.
   ```
   
   Since all threads were successfully stopped, I would return `true`. We clearly document that `ERROR` is a terminal state, so I do not see why somebody should wait for `NOT_RUNNING` when the client is in `ERROR` and `close()` returns `true`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r556218997



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -113,14 +114,7 @@
     private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CRASH =
         Collections.unmodifiableList(
             Collections.singletonList(
-                KeyValue.pair(KafkaStreams.State.RUNNING, KafkaStreams.State.ERROR)
-            )
-        );
-    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CLOSE_CRASHED =
-        Collections.unmodifiableList(
-            Arrays.asList(
-                KeyValue.pair(KafkaStreams.State.ERROR, KafkaStreams.State.PENDING_SHUTDOWN),
-                KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING)
+                KeyValue.pair(KafkaStreams.State.RUNNING, State.PENDING_ERROR)

Review comment:
       Actually it could probably be just `PENDING_ERROR -> ERROR` because it could be in `RUNNING` or `REBALNCING` previously




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547568019



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
##########
@@ -526,6 +526,6 @@ private static void produceSynchronously(final String topic, final List<KeyValue
 
     private static void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
         waitForCondition(() -> !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down.");
-        assertThat(driver.state(), is(KafkaStreams.State.PENDING_SHUTDOWN));
+        waitForCondition(() -> driver.state() == KafkaStreams.State.ERROR, "finish shutdown");

Review comment:
       Nit: the error message is weird -> `Streams didn't transit to ERROR state`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r553027410



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       I see. You are talking about the broker side log cleaner thread -- not the local state directory cleaner thread. (windowed stores also use the concept of segments, not just topic-partitions).
   
   However, it seems we want to extract this fix into a separate PR to be able to cherry-pick it to older branches? Seems to be unrelated to this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562223318



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -978,7 +971,7 @@ private void waitForRunning(final List<KeyValue<KafkaStreams.State, KafkaStreams
         waitForCondition(
             () -> !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING),
             MAX_WAIT_TIME_MS,
-            () -> "Client did not startup on time. Observers transitions: " + observed
+            () -> "Client did not have the expected state transition on time. Observers transitions: " + observed

Review comment:
       We do wait for running, I was thinking of bringing it to match with the other methods below but that doesn't make it anymore useful so I will revet it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r553022481



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped.");
+            return false;

Review comment:
       Fair point. Might be good to get input from somebody else in addition. \cc @cadonna 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562043444



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped.");
+            return false;

Review comment:
       Alright I adjusted the close response to align with this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#issuecomment-764959796


   Btw: to what extend do we need to update the docs? We should at least add a section to `streams/upgrade_guide.html` to mention the change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r556196100



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);
-
                 commitErrorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2Alpha.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);

Review comment:
       Thanks for clarifying.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r551458558



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -524,9 +460,6 @@ public void testStateThreadClose() throws Exception {
                     "Thread never stopped.");
                 streams.threads.get(i).join();
             }
-            TestUtils.waitForCondition(

Review comment:
       thats probably a good idea

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -526,12 +521,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
 
-                waitForStateTransition(stateTransitions2, CRASH);
-
                 commitErrorInjectedClient2.set(false);
                 stateTransitions2.clear();
                 streams2Alpha.close();
-                waitForStateTransition(stateTransitions2, CLOSE_CRASHED);

Review comment:
       My thoughts where that we are calling close and so its is going to just be closed as the state transitions are cleared. This is the test that made me want you to review the PR I was worried that I might have broken the integrity of the tests as it is not behaving as I expected. However no other test was having this problem so I was not sure if I was understanding it correctly. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
     }
 
     @Test
-    @SuppressWarnings("deprecation")

Review comment:
       It made it so that the new default was not used until we updated the Error transition as we are doing in this PR. :) 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1165,11 +1151,21 @@ private Thread shutdownHelper(final boolean error) {
             metrics.close();
             if (!error) {
                 setState(State.NOT_RUNNING);
+            } else {
+                setState(State.ERROR);
             }
         }, "kafka-streams-close-thread");
     }
 
     private boolean close(final long timeoutMs) {
+        if (state == State.ERROR) {
+            log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped.");
+            return false;

Review comment:
       I don't think that we should return true. A user would then expect that that the state would then change to `NOT_RUNNING` and could be stuck waiting on that. Where if we return false in then they might retry and get stuck there.  
   
   I don't know which is a better problem to have but I think that changing the meaning of this return value won't add any more clarity to the situation. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -557,12 +490,12 @@ public void testStateGlobalThreadClose() throws Exception {
                 () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
                 "Thread never stopped.");
             globalStreamThread.join();
-            assertEquals(streams.state(), KafkaStreams.State.ERROR);
+            assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR);
         } finally {
             streams.close();
         }
 
-        assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+        assertEquals(streams.state(), KafkaStreams.State.PENDING_ERROR);

Review comment:
       It will. We don't strictly need to test it here as that is tested elsewhere but we can add it for clarity

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -226,7 +226,8 @@
         RUNNING(1, 2, 3, 5),    // 2
         PENDING_SHUTDOWN(4),    // 3
         NOT_RUNNING,            // 4
-        ERROR(3);               // 5
+        PENDING_ERROR(6),       // 5
+        ERROR;               // 6

Review comment:
       yay ascii art! good catch I forgot about that
   
   When I build the docs locally it looks about right
   
   ![image](https://user-images.githubusercontent.com/18128741/103559186-9815d680-4e6a-11eb-8e4f-3db522102ff6.png)
   

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       My theory (based on logs) is that the cleaner thread was sometimes activating and removing the segment before it should so the record that makes the thread crash was not being consumed by the recovery thread sometimes. I just changed the time stamps so that the cleaner thread would not find them old and if it did activate it would not clean them.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##########
@@ -128,8 +128,8 @@ public void shouldShutdownThreadUsingOldHandler() throws InterruptedException {
             TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time");
             // should call the UncaughtExceptionHandler after rebalancing to another thread
             TestUtils.waitForCondition(() -> counter.get() == 2, DEFAULT_DURATION.toMillis(), "Handler was called 2nd time");
-            // the stream should now turn into ERROR state after 2 threads are dead
-            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
+            // there is no threads running but the client is still in running
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       We can but we should be able to see that with `counter` that the 2 threads already failed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562224408



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -993,6 +986,17 @@ private void waitForStateTransition(final List<KeyValue<KafkaStreams.State, Kafk
         );
     }
 
+    private void waitForStateTransitionContains(final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed,
+                                        final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected)
+            throws Exception {
+
+        waitForCondition(
+            () -> observed.containsAll(expected),
+            MAX_WAIT_TIME_MS,
+            () -> "Client did not have the expected state transition on time. Observers transitions: " + observed

Review comment:
       sure that is fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547564862



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -113,14 +114,7 @@
     private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CRASH =
         Collections.unmodifiableList(
             Collections.singletonList(
-                KeyValue.pair(KafkaStreams.State.RUNNING, KafkaStreams.State.ERROR)
-            )
-        );
-    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CLOSE_CRASHED =
-        Collections.unmodifiableList(
-            Arrays.asList(
-                KeyValue.pair(KafkaStreams.State.ERROR, KafkaStreams.State.PENDING_SHUTDOWN),
-                KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING)
+                KeyValue.pair(KafkaStreams.State.RUNNING, State.PENDING_ERROR)

Review comment:
       Should we extend it to have two state transitions, `RUNNING -> PENDING_ERROR` and `PENDING_ERROR -> ERROR` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547566599



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -611,12 +604,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
                 verifyUncommitted(expectedUncommittedResult);
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
-                waitForStateTransition(stateTransitions1, CRASH);

Review comment:
       As above

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -611,12 +604,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
                 verifyUncommitted(expectedUncommittedResult);
 
                 assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
-                waitForStateTransition(stateTransitions1, CRASH);
 
                 commitErrorInjectedClient1.set(false);
                 stateTransitions1.clear();
                 streams1Beta.close();
-                waitForStateTransition(stateTransitions1, CLOSE_CRASHED);
+                waitForStateTransition(stateTransitions1, CLOSE);

Review comment:
       As above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r562200891



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -978,7 +971,7 @@ private void waitForRunning(final List<KeyValue<KafkaStreams.State, KafkaStreams
         waitForCondition(
             () -> !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING),
             MAX_WAIT_TIME_MS,
-            () -> "Client did not startup on time. Observers transitions: " + observed
+            () -> "Client did not have the expected state transition on time. Observers transitions: " + observed

Review comment:
       Why this change? We do wait for `RUNNING`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -993,6 +986,17 @@ private void waitForStateTransition(final List<KeyValue<KafkaStreams.State, Kafk
         );
     }
 
+    private void waitForStateTransitionContains(final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed,
+                                        final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected)
+            throws Exception {
+
+        waitForCondition(
+            () -> observed.containsAll(expected),
+            MAX_WAIT_TIME_MS,
+            () -> "Client did not have the expected state transition on time. Observers transitions: " + observed

Review comment:
       Can we add the expected transitions, too? Easier to debug if the test fails.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       The PR you liked seems to be unrelated to this test.
   
   Still wondering if we should extract this change to a dedicated PR and cherry-pick to older branches? -- Or do we have a good explanation why older branches would not be affected?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -993,6 +986,17 @@ private void waitForStateTransition(final List<KeyValue<KafkaStreams.State, Kafk
         );
     }
 
+    private void waitForStateTransitionContains(final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed,
+                                        final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected)

Review comment:
       nit: fix indention




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r560529803



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -113,14 +114,7 @@
     private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CRASH =
         Collections.unmodifiableList(
             Collections.singletonList(
-                KeyValue.pair(KafkaStreams.State.RUNNING, KafkaStreams.State.ERROR)
-            )
-        );
-    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CLOSE_CRASHED =
-        Collections.unmodifiableList(
-            Arrays.asList(
-                KeyValue.pair(KafkaStreams.State.ERROR, KafkaStreams.State.PENDING_SHUTDOWN),
-                KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING)
+                KeyValue.pair(State.PENDING_ERROR, State.ERROR)

Review comment:
       Using just `PENDING_ERROR -> ERROR` because the transition to `PENDING_ERROR` can be from multiple sources. Also that transition is already tested so this check implies it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9720: KAFKA-10555: Improve client state machine

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9720:
URL: https://github.com/apache/kafka/pull/9720#discussion_r547567210



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -174,8 +177,8 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
     }
 
     @Test
-    @Deprecated
     public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
+        final long time = System.currentTimeMillis();

Review comment:
       Not sure if I understand. Can you elaborate?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org