You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/02 02:07:55 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`

mjsax commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r568272373



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;
+            } else {
+                record = null;
+                throw new TaskCorruptedException(Collections.singletonMap(id, changelogPartitions()));

Review comment:
       Not sure if starting the timeout at all makes sense for this case?
   
   Also, starting it when we transit to RUNNING seems to be a non-trivial change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org