You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/19 18:27:15 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

mjsax commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788014733



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Objects;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+
+/**
+ * This class represents a version of a {@link StampedRecord} that failed to deserialize. We need
+ * a special record type so that {@link StreamTask} can update consumed offsets. See KAFKA-6502
+ * for more details.
+ */
+public class CorruptedRecord extends StampedRecord {
+
+    private final ConsumerRecord<byte[], byte[]> rawRecord;
+
+    CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
+        super(null, ConsumerRecord.NO_TIMESTAMP);

Review comment:
       Why not pass `rawRecord` (but replace with `null`)? Seems we could avoid overwriting all the methods (or I don't understand why we need to overwrite them)?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));

Review comment:
       nit: as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       nit (formatting -- could be a single line, or)
   ```
   task = createStatelessTask(createConfig(
       AT_LEAST_ONCE,
       "0",
       LogAndContinueExceptionHandler.class.getName()
   ));
   ```
   
   It's hard to read if first parameter `AT_LEAST_ONCE` has different indention than the others.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(1, encodeTimestamp(0))))));
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(2, encodeTimestamp(0))))));

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(1, encodeTimestamp(0))))));

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       same question

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       nit: as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));

Review comment:
       Nit (formatting):
   ```
   assertThat(
       task.prepareCommit(),
       equalTo(mkMap(mkEntry(partition, new OffsetAndMetadata(offset + 1, encodeTimestamp(-1)))))
   );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       Why do we need to add records to the consumer and call `poll()` ? Why isn't `task.addRecords()` sufficient to setup the test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org