You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/03 01:55:34 UTC

[GitHub] [kafka] mjsax opened a new pull request #9368: KAFKA-9274: Add timeout handling for state restore

mjsax opened a new pull request #9368:
URL: https://github.com/apache/kafka/pull/9368


    - part of KIP-572
   
   If a `TimeoutException` happens during restore of active tasks, or updating standby tasks, we need to trigger `task.timeout.ms` timeout.
   
   Call for review @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r499105607



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -137,4 +146,48 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
         this.inputPartitions = topicPartitions;
         topology.updateSourceTopics(nodeToSourceTopics);
     }
+
+    @Override
+    public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
+                                            final TimeoutException timeoutException,
+                                            final Logger log) throws StreamsException {

Review comment:
       We want to use the logger of `StreamTask` or `StandbyTask` to get the proper log prefix; thus, we just pass it into this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r499105740



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -137,4 +146,48 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
         this.inputPartitions = topicPartitions;
         topology.updateSourceTopics(nodeToSourceTopics);
     }
+
+    @Override
+    public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
+                                            final TimeoutException timeoutException,
+                                            final Logger log) throws StreamsException {
+        if (deadlineMs == NO_DEADLINE) {
+            deadlineMs = currentWallClockMs + taskTimeoutMs;
+        } else if (currentWallClockMs > deadlineMs) {
+            final String errorMessage = String.format(
+                "Task %s did not make progress within %d ms. Adjust `%s` if needed.",
+                id,
+                currentWallClockMs - deadlineMs + taskTimeoutMs,
+                StreamsConfig.TASK_TIMEOUT_MS_CONFIG
+            );
+
+            if (timeoutException != null) {

Review comment:
       I added this, as for `poll()` we would not get a `TimeoutException` atm (even if I hold off to tackle poll() already)
   
   We could also change this and assume that `timeoutException` is never `null`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#issuecomment-709641131


   Updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r499105759



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -137,4 +146,48 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
         this.inputPartitions = topicPartitions;
         topology.updateSourceTopics(nodeToSourceTopics);
     }
+
+    @Override
+    public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
+                                            final TimeoutException timeoutException,
+                                            final Logger log) throws StreamsException {
+        if (deadlineMs == NO_DEADLINE) {
+            deadlineMs = currentWallClockMs + taskTimeoutMs;
+        } else if (currentWallClockMs > deadlineMs) {
+            final String errorMessage = String.format(
+                "Task %s did not make progress within %d ms. Adjust `%s` if needed.",
+                id,
+                currentWallClockMs - deadlineMs + taskTimeoutMs,
+                StreamsConfig.TASK_TIMEOUT_MS_CONFIG
+            );
+
+            if (timeoutException != null) {
+                throw new TimeoutException(errorMessage, timeoutException);
+            } else {
+                throw new TimeoutException(errorMessage);
+            }
+        }
+
+        if (timeoutException != null) {
+            log.debug(
+                "Timeout exception. Remaining time to deadline {}; retrying.",
+                deadlineMs - currentWallClockMs,
+                timeoutException
+            );
+        } else {
+            log.debug(
+                "Task did not make progress. Remaining time to deadline {}; retrying.",
+                deadlineMs - currentWallClockMs
+            );
+        }
+
+    }
+
+    @Override
+    public void clearTaskTimeout(final Logger log) {
+        if (deadlineMs != NO_DEADLINE) {

Review comment:
       As we call this reset logic "blindly" we put this guard to avoid spamming the logs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r501306055



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -148,7 +148,7 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
     }
 
     void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
-                                     final TimeoutException timeoutException,
+                                     final Exception cause,

Review comment:
       While the KIP itself only talks about `TimeoutException` there could be other causes for not making progress and thus we should be a little bit more general here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r499106137



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -431,6 +432,9 @@ public void restore() {
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
                 polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
+
+                // TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ?
+                // TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ?

Review comment:
       For the global-thread, we also consider the case that `poll()` returns nothing. For the global-thread it's a little simper though, as we restore on a per-partition basis.
   
   For the `StoreChangelogReader` it's more complicated:
    - for StandbyTasks, there might be nothing to be restored and thus getting no records might be fine (however, the "metadata" is not easily available as it's encapsulated in other classes...
    - for active restoring tasks, it might be simpler...
   
   Was holding off, as I am not sure how important this case is, with regard to robustness. We would not crash for this case. I would still like to get this done, but not necessarily for `2.7` release though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9368:
URL: https://github.com/apache/kafka/pull/9368


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r504792364



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -482,8 +486,10 @@ boolean tryToCompleteRestoration() {
                 if (restored.containsAll(task.changelogPartitions())) {
                     try {
                         task.completeRestoration();
-                    } catch (final TimeoutException e) {
-                        log.debug("Could not complete restoration for {} due to {}; will retry", task.id(), e);
+                        task.clearTaskTimeout();
+                    } catch (final TimeoutException timeoutException) {
+                        task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+                        log.debug("Could not complete restoration for {} due to {}; will retry", task.id(), timeoutException);

Review comment:
       ```suggestion
                           log.debug(String.format("Could not complete restoration for %s; will retry", task.id()), timeoutException);
   ```
   
   It might be a good idea to add tests for the log messages so we can tell if they're actually properly formatted or not. Hopefully, the log4j upgrade makes it easier to detect these logging bugs.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -147,9 +147,12 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
         topology.updateSourceTopics(nodeToSourceTopics);
     }
 
+    /**
+     * @throws TimeoutException if {@code currentWallClockMs > task-timeout-deadline}
+     */
     void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
-                                     final TimeoutException timeoutException,
-                                     final Logger log) throws StreamsException {
+                                     final Exception cause,
+                                     final Logger log) {

Review comment:
       It seems like we ought to just define `log` at the AbstractTask level and avoid having two almost identical `maybeInitTaskTimeoutOrThrow` method definitions.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -148,7 +148,7 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
     }
 
     void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
-                                     final TimeoutException timeoutException,
+                                     final Exception cause,

Review comment:
       Sounds good!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -454,18 +454,22 @@ private void addNewTask(final Task task) {
      * @throws StreamsException if the store's change log does not contain the partition
      * @return {@code true} if all tasks are fully restored
      */
-    boolean tryToCompleteRestoration() {
+    boolean tryToCompleteRestoration(final long now) {
         boolean allRunning = true;
 
         final List<Task> activeTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
             try {
                 task.initializeIfNeeded();
-            } catch (final LockException | TimeoutException e) {
+                task.clearTaskTimeout();
+            } catch (final LockException retriableException) {
                 // it is possible that if there are multiple threads within the instance that one thread
                 // trying to grab the task from the other, while the other has not released the lock since
                 // it did not participate in the rebalance. In this case we can just retry in the next iteration
-                log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
+                log.debug("Could not initialize {} due to the following exception; will retry", task.id(), retriableException);

Review comment:
       ```suggestion
                   log.debug(String.format("Could not initialize %s due to the following exception; will retry", task.id()), retriableException);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r501307743



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -625,11 +670,16 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
                     partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())),
                     new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED)
             );
-            return result.all().get().entrySet().stream().collect(
-                    Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset()));
-        } catch (final TimeoutException | InterruptedException | ExecutionException e) {
-            // if timeout exception gets thrown we just give up this time and retry in the next run loop
+
+            final Map<TopicPartition,  ListOffsetsResult.ListOffsetsResultInfo> resultPerPartition = result.all().get();
+            clearTaskTimeout(getTasksFromPartitions(tasks, partitions));
+
+            return resultPerPartition.entrySet().stream().collect(
+                    Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset())
+            );
+        } catch (final TimeoutException | InterruptedException | ExecutionException retriableException) {
             log.debug("Could not fetch all end offsets for {}, will retry in the next run loop", partitions);
+            maybeInitTaskTimeoutOrThrow(getTasksFromPartitions(tasks, partitions), retriableException);

Review comment:
       Here, we could also get different exceptions in addition to `TimeoutException` that we should just handle in the same way?
   
   Not sure about `InterruptedException`? Should this ever be thrown? Would it indicate a bug and we should just die?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r499105545



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -694,7 +694,7 @@
                     CommonClientConfigs.SECURITY_PROTOCOL_DOC)
             .define(TASK_TIMEOUT_MS_CONFIG,
                     Type.LONG,
-                    Duration.ofSeconds(5L).toMillis(),
+                    Duration.ofMinutes(5L).toMillis(),

Review comment:
       We got this wrong in the original PR...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#issuecomment-708101388


   Updates this PR to add the missing test updates. Also rebased and squashed.
   
   Call for review @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r499812565



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -205,4 +207,9 @@ default boolean maybePunctuateSystemTime() {
         return false;
     }
 
+    void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
+                                     final TimeoutException timeoutException,
+                                     final Logger log) throws StreamsException;

Review comment:
       StreamsException is unchecked, right?
   
   It's better to document unchecked exceptions in the `@throws` javadoc tag. The `throws` keyword is for telling the compiler that you want callers instead of yourself to handle a _checked_ exception. I honestly have no idea why the java team chose to say "this is poor style" instead of just making it a compiler error, but that's the rationale. https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html#throwstag

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -431,6 +432,9 @@ public void restore() {
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
                 polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
+
+                // TODO (?) If we cannot fetch records during restore, should we trigger `task.timeout.ms` ?
+                // TODO (?) If we cannot fetch records for standby task, should we trigger `task.timeout.ms` ?

Review comment:
       Thanks. I agree we can afford to leave this for future improvements.
   
   It does seem like we should have some kind of improvement in the future, though. Having a restore or standby-update fail indefinitely would be just as damaging to an application's robustness as having the main consumer fail indefinitely.
   
   Perhaps we can make some improvements to the Consumer first, though, so that we don't have to do so much guesswork to distinguish between "no records" and "no fetch".

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -137,4 +146,48 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
         this.inputPartitions = topicPartitions;
         topology.updateSourceTopics(nodeToSourceTopics);
     }
+
+    @Override
+    public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
+                                            final TimeoutException timeoutException,
+                                            final Logger log) throws StreamsException {
+        if (deadlineMs == NO_DEADLINE) {
+            deadlineMs = currentWallClockMs + taskTimeoutMs;
+        } else if (currentWallClockMs > deadlineMs) {
+            final String errorMessage = String.format(
+                "Task %s did not make progress within %d ms. Adjust `%s` if needed.",
+                id,
+                currentWallClockMs - deadlineMs + taskTimeoutMs,
+                StreamsConfig.TASK_TIMEOUT_MS_CONFIG
+            );
+
+            if (timeoutException != null) {
+                throw new TimeoutException(errorMessage, timeoutException);
+            } else {
+                throw new TimeoutException(errorMessage);
+            }
+        }
+
+        if (timeoutException != null) {
+            log.debug(
+                "Timeout exception. Remaining time to deadline {}; retrying.",
+                deadlineMs - currentWallClockMs,
+                timeoutException
+            );

Review comment:
       This is the wrong format for this log message. The exception won't be logged. You have to format the string first:
   ```suggestion
               log.debug(
                   String.format("Timeout exception. Remaining time to deadline %d; retrying.",
                   deadlineMs - currentWallClockMs),
                   timeoutException
               );
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -137,4 +146,48 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
         this.inputPartitions = topicPartitions;
         topology.updateSourceTopics(nodeToSourceTopics);
     }
+
+    @Override
+    public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
+                                            final TimeoutException timeoutException,
+                                            final Logger log) throws StreamsException {

Review comment:
       Sounds good. An alternative is to add an `abstract Logger log()` to AbstractTask's interface, which would make it clearer that the logger is still going to have the appropriate class name.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -694,7 +694,7 @@
                     CommonClientConfigs.SECURITY_PROTOCOL_DOC)
             .define(TASK_TIMEOUT_MS_CONFIG,
                     Type.LONG,
-                    Duration.ofSeconds(5L).toMillis(),
+                    Duration.ofMinutes(5L).toMillis(),

Review comment:
       Oops! Good catch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org