You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/18 17:11:35 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #11791: KAFKA-13676: Commit tasks on error if ALOS

wcarlson5 opened a new pull request #11791:
URL: https://github.com/apache/kafka/pull/11791


   When we hit an exception when processing tasks we should save the work we have done so far.
   This will only be relevant with ALOS, not EOS. It will actually reduce the number of duplicated record in ALOS because we will not be successfully processing tasks successfully more than once in many cases.
   
   The behavior was rather throughly tested. There is a `EmitOnChangeIntegrationTest` that makes sure if there is an exception in a task that has an error gets reprocessed. I just added another task that gets processed successfully and makes sure that gets processed too
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11791: KAFKA-13676: Commit tasks on error if ALOS

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11791:
URL: https://github.com/apache/kafka/pull/11791#discussion_r812270892



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -80,6 +89,9 @@ private long processTask(final Task task, final int maxNumRecords, final Time ti
                 task.clearTaskTimeout();
                 processed++;
             }
+            if (processingMode == AT_LEAST_ONCE) {

Review comment:
       Can be `AT_LEAST_ONCE` or `EXACTLY_ONCE_V1`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11791: KAFKA-13676: Commit tasks on error if ALOS

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11791:
URL: https://github.com/apache/kafka/pull/11791#issuecomment-1044861346


   @guozhangwang @ableegoldman This is ready for a pass when you have a sec


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11791: KAFKA-13676: Commit tasks on error if ALOS

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11791:
URL: https://github.com/apache/kafka/pull/11791#discussion_r812235789



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
##########
@@ -140,6 +160,86 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
                     new KeyValue<>(1, "B")
                 )
             );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    StringDeserializer.class
+                ),
+                outputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                )
+            );
+        }
+    }
+
+    @Test
+    public void shouldEmitRecordsAfterFailures() throws Exception {
+        final Properties properties  = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(inputTopic2).to(outputTopic2);
+        builder.table(inputTopic, Materialized.as("test-store"))

Review comment:
       We can simplify it to `stream().peek(throw).to()` without materializing a store.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -234,6 +244,15 @@ private void updateTaskCommitMetadata(final Map<TopicPartition, OffsetAndMetadat
         }
     }
 
+    private void commitSuccessfullyProcessedTasks() {
+        if (processingMode == AT_LEAST_ONCE && !tasks.successfullyProcessed().isEmpty()) {

Review comment:
       The check on the processingMode happens in two places, which seems redundant to me: e.g. if it is not ALOS, then the `successfullyProcessed()` should always be empty here. I think we can simply this line to only consider the second condition.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -62,9 +64,16 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, fina
      */
     int process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
-
-        for (final Task task : tasks.activeTasks()) {
-            totalProcessed += processTask(task, maxNumRecords, time);
+        Task lastProcessed = null;
+        try {
+            for (final Task task : tasks.activeTasks()) {
+                lastProcessed = task;
+                totalProcessed += processTask(task, maxNumRecords, time);
+            }
+        } catch (final Exception e) {
+            tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);

Review comment:
       If an exception throws here, it means we should have not reached line 93 below and hence this task should not have been added to the set, and hence we do not need to ever call `removeTaskFromCuccessfullyProcessedBeforeClosing` right?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
##########
@@ -140,6 +160,86 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
                     new KeyValue<>(1, "B")
                 )
             );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    StringDeserializer.class
+                ),
+                outputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                )
+            );
+        }
+    }
+
+    @Test
+    public void shouldEmitRecordsAfterFailures() throws Exception {
+        final Properties properties  = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(inputTopic2).to(outputTopic2);
+        builder.table(inputTopic, Materialized.as("test-store"))
+            .toStream()
+            .map((key, value) -> {
+                throw new RuntimeException("Kaboom");
+            })
+            .to(outputTopic);
+
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                Arrays.asList(
+                    new KeyValue<>(1, "A")
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                0L);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                0L);
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(

Review comment:
       Should we also check that `outputTopic` never sees the record `1 -> A` since that task kept throwing exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11791: KAFKA-13676: Commit tasks on error if ALOS

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11791:
URL: https://github.com/apache/kafka/pull/11791#discussion_r812257023



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
##########
@@ -140,6 +160,86 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
                     new KeyValue<>(1, "B")
                 )
             );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    StringDeserializer.class
+                ),
+                outputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                )
+            );
+        }
+    }
+
+    @Test
+    public void shouldEmitRecordsAfterFailures() throws Exception {
+        final Properties properties  = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(inputTopic2).to(outputTopic2);
+        builder.table(inputTopic, Materialized.as("test-store"))
+            .toStream()
+            .map((key, value) -> {
+                throw new RuntimeException("Kaboom");
+            })
+            .to(outputTopic);
+
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                Arrays.asList(
+                    new KeyValue<>(1, "A")
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                0L);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                0L);
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    StringDeserializer.class
+                ),
+                outputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),

Review comment:
       Also I think by just checking that 1->A and 1->B are there we do not guarantee there's no duplicates due to re-processing right? I think we should check that the offset on the input topic can be committed and also there's no duplicates in the output.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11791: KAFKA-13676: Commit tasks on error if ALOS

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11791:
URL: https://github.com/apache/kafka/pull/11791#issuecomment-1048475792


   > @guozhangwang I understand the issue with performance and we wouldn't want to leave it that way for sure. But seeing that this is a strict improvement over the current behavior, as right now it won't make any progress in the case you describe, I don't think we need to hold off merging this. Also I think it should make writing tests for the backing off and reordering PR easier so it might make sense to merge it.
   
   To make sure that we do not introduce side effects beyond the named topology, what about we narrow the scope of this optimization to only named topology for now? Besides this, sounds reasonable to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11791: KAFKA-13676: Commit tasks on error if ALOS

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11791:
URL: https://github.com/apache/kafka/pull/11791#discussion_r812318275



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
##########
@@ -140,6 +160,86 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
                     new KeyValue<>(1, "B")
                 )
             );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    StringDeserializer.class
+                ),
+                outputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                )
+            );
+        }
+    }
+
+    @Test
+    public void shouldEmitRecordsAfterFailures() throws Exception {
+        final Properties properties  = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(inputTopic2).to(outputTopic2);
+        builder.table(inputTopic, Materialized.as("test-store"))
+            .toStream()
+            .map((key, value) -> {
+                throw new RuntimeException("Kaboom");
+            })
+            .to(outputTopic);
+
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                Arrays.asList(
+                    new KeyValue<>(1, "A")
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                0L);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                0L);
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(

Review comment:
       I will count using peek after the exception

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -62,9 +64,16 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, fina
      */
     int process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
-
-        for (final Task task : tasks.activeTasks()) {
-            totalProcessed += processTask(task, maxNumRecords, time);
+        Task lastProcessed = null;
+        try {
+            for (final Task task : tasks.activeTasks()) {
+                lastProcessed = task;
+                totalProcessed += processTask(task, maxNumRecords, time);
+            }
+        } catch (final Exception e) {
+            tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);

Review comment:
       Only if we hit the error in the first set of processing. We could have already processed it once this iteration and are coming back todo it again. I ran into this issue while testing and I m pretty sure that is why it is coming up again. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
##########
@@ -140,6 +160,86 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
                     new KeyValue<>(1, "B")
                 )
             );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    StringDeserializer.class
+                ),
+                outputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                )
+            );
+        }
+    }
+
+    @Test
+    public void shouldEmitRecordsAfterFailures() throws Exception {
+        final Properties properties  = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
+            )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(inputTopic2).to(outputTopic2);
+        builder.table(inputTopic, Materialized.as("test-store"))
+            .toStream()
+            .map((key, value) -> {
+                throw new RuntimeException("Kaboom");
+            })
+            .to(outputTopic);
+
+
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                Arrays.asList(
+                    new KeyValue<>(1, "A")
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                0L);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),
+                    new KeyValue<>(1, "B")
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    StringSerializer.class,
+                    new Properties()),
+                0L);
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    StringDeserializer.class
+                ),
+                outputTopic2,
+                Arrays.asList(
+                    new KeyValue<>(1, "A"),

Review comment:
       Yeah good idea. I will check to make sure it doesn't get processed more than once.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -234,6 +244,15 @@ private void updateTaskCommitMetadata(final Map<TopicPartition, OffsetAndMetadat
         }
     }
 
+    private void commitSuccessfullyProcessedTasks() {
+        if (processingMode == AT_LEAST_ONCE && !tasks.successfullyProcessed().isEmpty()) {

Review comment:
       sounds good. I have no issues removing this check

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -80,6 +89,9 @@ private long processTask(final Task task, final int maxNumRecords, final Time ti
                 task.clearTaskTimeout();
                 processed++;
             }
+            if (processingMode == AT_LEAST_ONCE) {

Review comment:
       How about `!EXACTLY_ONCE_V2`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #11791: KAFKA-13676: Commit successfully processed tasks on error

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #11791:
URL: https://github.com/apache/kafka/pull/11791


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11791: KAFKA-13676: Commit tasks on error if ALOS

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11791:
URL: https://github.com/apache/kafka/pull/11791#discussion_r812503629



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -245,9 +244,10 @@ private void updateTaskCommitMetadata(final Map<TopicPartition, OffsetAndMetadat
     }
 
     private void commitSuccessfullyProcessedTasks() {
-        if (processingMode == AT_LEAST_ONCE && !tasks.successfullyProcessed().isEmpty()) {
-            log.error("Streams encountered an error when processing tasks." +
-                " Will commit all previously successfully processed tasks");
+        if (!tasks.successfullyProcessed().isEmpty()) {
+            log.info("Streams encountered an error when processing tasks." +
+                " Will commit all previously successfully processed tasks {}",
+                tasks.successfullyProcessed().toString());

Review comment:
       For future reference, you can skip the `toString()` -- the logger should make the conversion implicitly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org