You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/16 23:16:23 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

vvcephei commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r524564694



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -817,13 +817,15 @@ private void initializeMetadata() {
                     .filter(e -> e.getValue() != null)
                     .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
             initializeTaskTime(offsetsAndMetadata);
-        } catch (final TimeoutException e) {
-            log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
-                            "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
-                    e.toString(),
-                    ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-
-            throw e;
+        } catch (final TimeoutException timeoutException) {
+            log.warn(
+                "Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
+                    "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
+                time.toString(),
+                ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);

Review comment:
       It might still be nice to see the stacktrace here (even if it also gets logged elsewhere). If you want to do it, don't forget you have to change to using `String.format` for the variable substitution.
   
   I don't feel strongly in this case, so I'll defer to you whether you want to do this or not.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1029,20 +1048,40 @@ int commit(final Collection<Task> tasksToCommit) {
             return -1;
         } else {
             int committed = 0;
-            final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            for (final Task task : tasksToCommit) {
+            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
+            final Iterator<Task> it = tasksToCommit.iterator();
+            while (it.hasNext()) {
+                final Task task = it.next();
                 if (task.commitNeeded()) {
                     final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
                     if (task.isActive()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), offsetAndMetadata);
+                        consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata);
                     }
+                } else {
+                    it.remove();
                 }
             }
 
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            final Set<Task> uncommittedTasks = new HashSet<>();
+            try {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+                tasksToCommit.forEach(Task::clearTaskTimeout);
+            } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {
+                final TimeoutException timeoutException = taskTimeoutExceptions.timeoutException();
+                if (timeoutException != null) {
+                    tasksToCommit.forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
+                    uncommittedTasks.addAll(tasksToCommit);
+                } else {
+                    for (final Map.Entry<Task, TimeoutException> timeoutExceptions : taskTimeoutExceptions.exceptions().entrySet()) {
+                        final Task task = timeoutExceptions.getKey();
+                        task.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutExceptions.getValue());
+                        uncommittedTasks.add(task);
+                    }
+                }
+            }
 
             for (final Task task : tasksToCommit) {
-                if (task.commitNeeded()) {
+                if (!uncommittedTasks.contains(task)) {
                     ++committed;
                     task.postCommit(false);

Review comment:
       maybe we should move `clearTaskTimeout` here, in case some of the tasks timed out, but not all?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1029,20 +1048,40 @@ int commit(final Collection<Task> tasksToCommit) {
             return -1;
         } else {
             int committed = 0;
-            final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            for (final Task task : tasksToCommit) {
+            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
+            final Iterator<Task> it = tasksToCommit.iterator();

Review comment:
       I was initially worried about potential side-effects of removing from the input collection, but on second thought, maybe it's reasonable (considering the usage of this method) to assume that the input collection is always a copy already.
   
   Still, it looks like we could actually simplify the implementation a little by _not_ reusing the input collection but instead only relying on the key set of `consumedOffsetsAndMetadataPerTask` as the list of "tasks to commit".
   
   Here's what I'm thinking:
   ```java
   int committed = 0;
               final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
               for (final Task task : tasksToCommit) {
                   if (task.commitNeeded()) {
                       final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
                       if (task.isActive()) {
                           consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata);
                       }
                   }
               }
   
               final Set<Task> uncommittedTasks = new HashSet<>();
               try {
                   commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
               } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {
                   final TimeoutException timeoutException = taskTimeoutExceptions.timeoutException();
                   if (timeoutException != null) {
                       consumedOffsetsAndMetadataPerTask.keySet().forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
                       uncommittedTasks.addAll(tasksToCommit);
                   } else {
                       for (final Map.Entry<Task, TimeoutException> timeoutExceptions : taskTimeoutExceptions.exceptions().entrySet()) {
                           final Task task = timeoutExceptions.getKey();
                           task.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutExceptions.getValue());
                           uncommittedTasks.add(task);
                       }
                   }
               }
   
               for (final Task task : tasksToCommit) {
                   if (!uncommittedTasks.contains(task)) {
                       ++committed;
                       task.postCommit(false);
                       task.clearTaskTimeout();
                   }
               }
   
               return committed;
   ```
   
   It struck me as potentially incorrect to
   * maybe init the timeout exception for standby tasks
   * not clear the timeout for successful tasks when other tasks were timed out
   
   But I'm also not sure whether these are big deals or not.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -262,13 +263,21 @@ void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
     /**
      * @throws IllegalStateException if EOS is disabled
      */
-    void abortTransaction() throws ProducerFencedException {
+    void abortTransaction() {
         if (!eosEnabled()) {
             throw new IllegalStateException(formatException("Exactly-once is not enabled"));
         }
         if (transactionInFlight) {
             try {
                 producer.abortTransaction();
+            } catch (final TimeoutException logAndSwallow) {
+                // no need to re-throw because we abort a TX only if we close a task dirty,
+                // and thus `task.timeout.ms` does not apply
+                log.debug(

Review comment:
       I'm wondering if we should make this `info` or `warn` level. It doesn't seem like it would be very verbose, and it might be nice to see by default because it will have secondary effects later on when we try to start a new transaction, but get blocked.
   
   But I also don't feel strongly about it, so I leave it to your discretion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org