You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/06 05:57:50 UTC

[GitHub] [kafka] mjsax opened a new pull request #9570: KAFKA-9274: Handle TimeoutException on commit

mjsax opened a new pull request #9570:
URL: https://github.com/apache/kafka/pull/9570


    - part of KIP-572
    - when KafkaStreams commit a task, a TimeoutException should not kill
      the thread but `task.timeout.ms` should be triggered and the commit
      should be retried in the next loop
   
   Call for review @vvcephei 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518539527



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -164,9 +164,9 @@ public InternalTopicManager(final Time time,
                                 "Error message was: {}", topicName, cause.toString());
                             throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
                         }
-                    } catch (final TimeoutException retryableException) {
+                    } catch (final TimeoutException retriableException) {

Review comment:
       Just fixing some naming issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r525340385



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -817,13 +817,15 @@ private void initializeMetadata() {
                     .filter(e -> e.getValue() != null)
                     .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
             initializeTaskTime(offsetsAndMetadata);
-        } catch (final TimeoutException e) {
-            log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
-                            "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
-                    e.toString(),
-                    ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-
-            throw e;
+        } catch (final TimeoutException timeoutException) {
+            log.warn(
+                "Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
+                    "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
+                time.toString(),
+                ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);

Review comment:
       I guess I leave it as-is.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -262,13 +263,21 @@ void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
     /**
      * @throws IllegalStateException if EOS is disabled
      */
-    void abortTransaction() throws ProducerFencedException {
+    void abortTransaction() {
         if (!eosEnabled()) {
             throw new IllegalStateException(formatException("Exactly-once is not enabled"));
         }
         if (transactionInFlight) {
             try {
                 producer.abortTransaction();
+            } catch (final TimeoutException logAndSwallow) {
+                // no need to re-throw because we abort a TX only if we close a task dirty,
+                // and thus `task.timeout.ms` does not apply
+                log.debug(

Review comment:
       Ack. `warn` it is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518539735



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -111,6 +111,8 @@ public void initialize() {
             try {
                 partitions = streamsProducer.partitionsFor(topic);
             } catch (final KafkaException e) {
+                // TODO: KIP-572 need to handle `TimeoutException`
+                // -> should we throw a `TaskCorruptedException` for this case to reset the task and retry (including triggering `task.timeout.ms`) ?

Review comment:
       This is an open question. Input is welcome. I would like to tackle it in a follow up PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518540480



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -997,7 +1016,7 @@ void shutdown(final boolean clean) {
 
     // For testing only.
     int commitAll() {
-        return commit(tasks.values());
+        return commit(new HashSet<>(tasks.values()));

Review comment:
       Need to make a copy (as all production calls do, too).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r525343321



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1029,20 +1048,40 @@ int commit(final Collection<Task> tasksToCommit) {
             return -1;
         } else {
             int committed = 0;
-            final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            for (final Task task : tasksToCommit) {
+            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
+            final Iterator<Task> it = tasksToCommit.iterator();

Review comment:
       Good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r524564694



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -817,13 +817,15 @@ private void initializeMetadata() {
                     .filter(e -> e.getValue() != null)
                     .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
             initializeTaskTime(offsetsAndMetadata);
-        } catch (final TimeoutException e) {
-            log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
-                            "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
-                    e.toString(),
-                    ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-
-            throw e;
+        } catch (final TimeoutException timeoutException) {
+            log.warn(
+                "Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
+                    "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
+                time.toString(),
+                ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);

Review comment:
       It might still be nice to see the stacktrace here (even if it also gets logged elsewhere). If you want to do it, don't forget you have to change to using `String.format` for the variable substitution.
   
   I don't feel strongly in this case, so I'll defer to you whether you want to do this or not.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1029,20 +1048,40 @@ int commit(final Collection<Task> tasksToCommit) {
             return -1;
         } else {
             int committed = 0;
-            final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            for (final Task task : tasksToCommit) {
+            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
+            final Iterator<Task> it = tasksToCommit.iterator();
+            while (it.hasNext()) {
+                final Task task = it.next();
                 if (task.commitNeeded()) {
                     final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
                     if (task.isActive()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), offsetAndMetadata);
+                        consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata);
                     }
+                } else {
+                    it.remove();
                 }
             }
 
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            final Set<Task> uncommittedTasks = new HashSet<>();
+            try {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+                tasksToCommit.forEach(Task::clearTaskTimeout);
+            } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {
+                final TimeoutException timeoutException = taskTimeoutExceptions.timeoutException();
+                if (timeoutException != null) {
+                    tasksToCommit.forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
+                    uncommittedTasks.addAll(tasksToCommit);
+                } else {
+                    for (final Map.Entry<Task, TimeoutException> timeoutExceptions : taskTimeoutExceptions.exceptions().entrySet()) {
+                        final Task task = timeoutExceptions.getKey();
+                        task.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutExceptions.getValue());
+                        uncommittedTasks.add(task);
+                    }
+                }
+            }
 
             for (final Task task : tasksToCommit) {
-                if (task.commitNeeded()) {
+                if (!uncommittedTasks.contains(task)) {
                     ++committed;
                     task.postCommit(false);

Review comment:
       maybe we should move `clearTaskTimeout` here, in case some of the tasks timed out, but not all?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1029,20 +1048,40 @@ int commit(final Collection<Task> tasksToCommit) {
             return -1;
         } else {
             int committed = 0;
-            final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            for (final Task task : tasksToCommit) {
+            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
+            final Iterator<Task> it = tasksToCommit.iterator();

Review comment:
       I was initially worried about potential side-effects of removing from the input collection, but on second thought, maybe it's reasonable (considering the usage of this method) to assume that the input collection is always a copy already.
   
   Still, it looks like we could actually simplify the implementation a little by _not_ reusing the input collection but instead only relying on the key set of `consumedOffsetsAndMetadataPerTask` as the list of "tasks to commit".
   
   Here's what I'm thinking:
   ```java
   int committed = 0;
               final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
               for (final Task task : tasksToCommit) {
                   if (task.commitNeeded()) {
                       final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
                       if (task.isActive()) {
                           consumedOffsetsAndMetadataPerTask.put(task, offsetAndMetadata);
                       }
                   }
               }
   
               final Set<Task> uncommittedTasks = new HashSet<>();
               try {
                   commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
               } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {
                   final TimeoutException timeoutException = taskTimeoutExceptions.timeoutException();
                   if (timeoutException != null) {
                       consumedOffsetsAndMetadataPerTask.keySet().forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
                       uncommittedTasks.addAll(tasksToCommit);
                   } else {
                       for (final Map.Entry<Task, TimeoutException> timeoutExceptions : taskTimeoutExceptions.exceptions().entrySet()) {
                           final Task task = timeoutExceptions.getKey();
                           task.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutExceptions.getValue());
                           uncommittedTasks.add(task);
                       }
                   }
               }
   
               for (final Task task : tasksToCommit) {
                   if (!uncommittedTasks.contains(task)) {
                       ++committed;
                       task.postCommit(false);
                       task.clearTaskTimeout();
                   }
               }
   
               return committed;
   ```
   
   It struck me as potentially incorrect to
   * maybe init the timeout exception for standby tasks
   * not clear the timeout for successful tasks when other tasks were timed out
   
   But I'm also not sure whether these are big deals or not.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -262,13 +263,21 @@ void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
     /**
      * @throws IllegalStateException if EOS is disabled
      */
-    void abortTransaction() throws ProducerFencedException {
+    void abortTransaction() {
         if (!eosEnabled()) {
             throw new IllegalStateException(formatException("Exactly-once is not enabled"));
         }
         if (transactionInFlight) {
             try {
                 producer.abortTransaction();
+            } catch (final TimeoutException logAndSwallow) {
+                // no need to re-throw because we abort a TX only if we close a task dirty,
+                // and thus `task.timeout.ms` does not apply
+                log.debug(

Review comment:
       I'm wondering if we should make this `info` or `warn` level. It doesn't seem like it would be very verbose, and it might be nice to see by default because it will have secondary effects later on when we try to start a new transaction, but get blocked.
   
   But I also don't feel strongly about it, so I leave it to your discretion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518540480



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -997,7 +1016,7 @@ void shutdown(final boolean clean) {
 
     // For testing only.
     int commitAll() {
-        return commit(tasks.values());
+        return commit(new HashSet<>(tasks.values()));

Review comment:
       Need to make a copy (as all production calls do, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9570:
URL: https://github.com/apache/kafka/pull/9570


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518539800



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -202,6 +204,10 @@ private void recordSendError(final String topic, final Exception exception, fina
                 "indicating the task may be migrated out";
             sendException.set(new TaskMigratedException(errorMessage, exception));
         } else {
+            // TODO: KIP-572 handle `TimeoutException extends RetriableException`

Review comment:
       As above: This is an open question. Input is welcome. I would like to tackle it in a follow up PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518539416



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/TaskTimeoutExceptions.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.streams.processor.internals.Task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskTimeoutExceptions extends StreamsException {

Review comment:
       If we commit multiple tasks individually (ie, eos-alpha), we use this class as an "exception container" to track the TimeoutException for each failed task individually.
   
   To simplify the caller code, we also wrap a single `TimeoutException` if we commit all tasks at once (at-least-once, eos-beta)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518540292



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -903,6 +913,15 @@ void shutdown(final boolean clean) {
                         tasksToCloseClean.remove(task);
                     }
                 }
+            } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {

Review comment:
       During shutdown, we don't need to trigger `task.timeout.ms` but can re-throw the `TimeoutException` to trigger a "dirty close" instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9570: KAFKA-9274: Handle TimeoutException on commit

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9570:
URL: https://github.com/apache/kafka/pull/9570#discussion_r518540199



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -570,6 +575,11 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
         // so we would capture any exception and throw
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskTimeoutExceptions taskTimeoutExceptions) {

Review comment:
       If a task is revoked, we don't need to trigger `task.timeout.ms` but can re-throw the `TimeoutException` to trigger a "dirty close" instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org