You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/17 11:40:55 UTC

[GitHub] [kafka] Gerrrr opened a new pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Gerrrr opened a new pull request #11683:
URL: https://github.com/apache/kafka/pull/11683


   Without this patch RecordQueue just skips records that failed to
   deserialize when DeserializationExceptionHandler is set to
   LogAndContinueExceptionHandler. This way, if the entire stream consists
   of corrupted records, the task never updated consumed offsets.
   
   This patch introduces a new record type - CorruptedRecord that
   wraps a raw record that failed to deserialize. RecordQueue's headRecord
   becomes CorruptedRecord if there were records in the fifoQueue and all
   of them failed to deserialize. In turn, StreamTask#process checks that
   a found record is not corrupted before processing it. If the record is
   an instance of CorruptedRecord, it updates the offset, sets commitNeeded
   to true and returns.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#issuecomment-1014719355


   The test failure, https://github.com/apache/kafka/pull/11683/checks?check_run_id=4842442884, does not seem related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788050048



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       Consider `committableOffsetsAndMetadata`:
   
   https://github.com/apache/kafka/blob/ed8b7875fed82feccec3d35aa700cce1ea5138f9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L438-L442
   
   and
   
   https://github.com/apache/kafka/blob/ed8b7875fed82feccec3d35aa700cce1ea5138f9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L453-L454
   
   The objective of these test cases is to show that consumed offsets changed according to the logic. At the time of the assertions, there are no records in the `fifoQueue`, so `offset` is `null` in the line 438. Consequently, it falls back to checking the consumer's position. The consumer offset is updated on `poll`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788051969



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(1, encodeTimestamp(0))))));
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(2, encodeTimestamp(0))))));

Review comment:
       Fixed in c7ebf2f.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(1, encodeTimestamp(0))))));

Review comment:
       Fixed in c7ebf2f.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       Fixed in c7ebf2f.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788053117



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));

Review comment:
       Fixed in c7ebf2f.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       Fixed in c7ebf2f.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788533743



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Objects;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+
+/**
+ * This class represents a version of a {@link StampedRecord} that failed to deserialize. We need
+ * a special record type so that {@link StreamTask} can update consumed offsets. See KAFKA-6502
+ * for more details.
+ */
+public class CorruptedRecord extends StampedRecord {
+
+    private final ConsumerRecord<byte[], byte[]> rawRecord;
+
+    CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
+        super(null, ConsumerRecord.NO_TIMESTAMP);

Review comment:
       Fixed in 19225a4.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788052839



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       Fixed in c7ebf2f.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #11683:
URL: https://github.com/apache/kafka/pull/11683


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788014733



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Objects;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+
+/**
+ * This class represents a version of a {@link StampedRecord} that failed to deserialize. We need
+ * a special record type so that {@link StreamTask} can update consumed offsets. See KAFKA-6502
+ * for more details.
+ */
+public class CorruptedRecord extends StampedRecord {
+
+    private final ConsumerRecord<byte[], byte[]> rawRecord;
+
+    CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
+        super(null, ConsumerRecord.NO_TIMESTAMP);

Review comment:
       Why not pass `rawRecord` (but replace with `null`)? Seems we could avoid overwriting all the methods (or I don't understand why we need to overwrite them)?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));

Review comment:
       nit: as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       nit (formatting -- could be a single line, or)
   ```
   task = createStatelessTask(createConfig(
       AT_LEAST_ONCE,
       "0",
       LogAndContinueExceptionHandler.class.getName()
   ));
   ```
   
   It's hard to read if first parameter `AT_LEAST_ONCE` has different indention than the others.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(1, encodeTimestamp(0))))));
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(2, encodeTimestamp(0))))));

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(1, encodeTimestamp(0))))));

Review comment:
       as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       same question

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       nit: as above

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));

Review comment:
       Nit (formatting):
   ```
   assertThat(
       task.prepareCommit(),
       equalTo(mkMap(mkEntry(partition, new OffsetAndMetadata(offset + 1, encodeTimestamp(-1)))))
   );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       Why do we need to add records to the consumer and call `poll()` ? Why isn't `task.addRecords()` sufficient to setup the test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#issuecomment-1017744320


   Thanks for the fix! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788034547



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Objects;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+
+/**
+ * This class represents a version of a {@link StampedRecord} that failed to deserialize. We need
+ * a special record type so that {@link StreamTask} can update consumed offsets. See KAFKA-6502
+ * for more details.
+ */
+public class CorruptedRecord extends StampedRecord {
+
+    private final ConsumerRecord<byte[], byte[]> rawRecord;
+
+    CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
+        super(null, ConsumerRecord.NO_TIMESTAMP);

Review comment:
       We can't pass `rawRecord` to the constructor because its type is `ConsumerRecord<byte[], byte[]>` while `StampedRecord` expects `ConsumerRecord<Object, Object>`; `byte[]` does not extend `Object`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788055037



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Objects;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+
+/**
+ * This class represents a version of a {@link StampedRecord} that failed to deserialize. We need
+ * a special record type so that {@link StreamTask} can update consumed offsets. See KAFKA-6502
+ * for more details.
+ */
+public class CorruptedRecord extends StampedRecord {
+
+    private final ConsumerRecord<byte[], byte[]> rawRecord;
+
+    CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
+        super(null, ConsumerRecord.NO_TIMESTAMP);

Review comment:
       Ah. That's annoying... I am usually a bid advocate to use generic for type safety, but for this case, I am wondering if we should sacrifice it to avoid boiler plate code?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Gerrrr commented on a change in pull request #11683: KAFKA-6502: Update consumed offsets on corrupted records.

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on a change in pull request #11683:
URL: https://github.com/apache/kafka/pull/11683#discussion_r788053269



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, ++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(offset))))));

Review comment:
       Fixed in c7ebf2f.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -2221,14 +2231,94 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
     }
 
     @Test
-    public void shouldCLearTaskTimeout() {
+    public void shouldClearTaskTimeout() {
         task = createStatelessTask(createConfig());
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
         task.clearTaskTimeout();
         task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null);
     }
 
+    @Test
+    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        long offset = -1;
+        final List<ConsumerRecord<byte[], byte[]>> records = asList(
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
+        consumer.addRecord(records.get(0));
+        consumer.addRecord(records.get(1));
+        consumer.poll(Duration.ZERO);
+        task.addRecords(partition1, records);
+
+        assertTrue(task.process(offset));
+        assertTrue(task.commitNeeded());
+        assertThat(task.prepareCommit(),
+            equalTo(mkMap(mkEntry(partition1,
+                new OffsetAndMetadata(offset + 1, encodeTimestamp(-1))))));
+    }
+
+    @Test
+    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE,
+            "0",
+            LogAndContinueExceptionHandler.class.getName()));

Review comment:
       Fixed in c7ebf2f.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org