You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/14 16:03:22 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks

vvcephei commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r504792364



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -482,8 +486,10 @@ boolean tryToCompleteRestoration() {
                 if (restored.containsAll(task.changelogPartitions())) {
                     try {
                         task.completeRestoration();
-                    } catch (final TimeoutException e) {
-                        log.debug("Could not complete restoration for {} due to {}; will retry", task.id(), e);
+                        task.clearTaskTimeout();
+                    } catch (final TimeoutException timeoutException) {
+                        task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+                        log.debug("Could not complete restoration for {} due to {}; will retry", task.id(), timeoutException);

Review comment:
       ```suggestion
                           log.debug(String.format("Could not complete restoration for %s; will retry", task.id()), timeoutException);
   ```
   
   It might be a good idea to add tests for the log messages so we can tell if they're actually properly formatted or not. Hopefully, the log4j upgrade makes it easier to detect these logging bugs.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -147,9 +147,12 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
         topology.updateSourceTopics(nodeToSourceTopics);
     }
 
+    /**
+     * @throws TimeoutException if {@code currentWallClockMs > task-timeout-deadline}
+     */
     void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
-                                     final TimeoutException timeoutException,
-                                     final Logger log) throws StreamsException {
+                                     final Exception cause,
+                                     final Logger log) {

Review comment:
       It seems like we ought to just define `log` at the AbstractTask level and avoid having two almost identical `maybeInitTaskTimeoutOrThrow` method definitions.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -148,7 +148,7 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
     }
 
     void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
-                                     final TimeoutException timeoutException,
+                                     final Exception cause,

Review comment:
       Sounds good!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -454,18 +454,22 @@ private void addNewTask(final Task task) {
      * @throws StreamsException if the store's change log does not contain the partition
      * @return {@code true} if all tasks are fully restored
      */
-    boolean tryToCompleteRestoration() {
+    boolean tryToCompleteRestoration(final long now) {
         boolean allRunning = true;
 
         final List<Task> activeTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
             try {
                 task.initializeIfNeeded();
-            } catch (final LockException | TimeoutException e) {
+                task.clearTaskTimeout();
+            } catch (final LockException retriableException) {
                 // it is possible that if there are multiple threads within the instance that one thread
                 // trying to grab the task from the other, while the other has not released the lock since
                 // it did not participate in the rebalance. In this case we can just retry in the next iteration
-                log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
+                log.debug("Could not initialize {} due to the following exception; will retry", task.id(), retriableException);

Review comment:
       ```suggestion
                   log.debug(String.format("Could not initialize %s due to the following exception; will retry", task.id()), retriableException);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org