You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/11 08:48:24 UTC

[kafka] branch 2.1 updated: KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 8157e98  KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
8157e98 is described below

commit 8157e984e92161c4e68678f158d05837edac2e62
Author: linyli001 <45...@users.noreply.github.com>
AuthorDate: Tue Dec 11 16:40:18 2018 +0800

    KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../kafka/clients/consumer/MockConsumer.java       |  4 +++
 .../processor/internals/StoreChangelogReader.java  |  2 ++
 .../internals/StoreChangelogReaderTest.java        | 36 ++++++++++++++++++++++
 3 files changed, 42 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9eee6da..4af3ba1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -191,6 +191,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             if (!subscriptions.isPaused(entry.getKey())) {
                 final List<ConsumerRecord<K, V>> recs = entry.getValue();
                 for (final ConsumerRecord<K, V> rec : recs) {
+                    if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
+                        throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey())));
+                    }
+
                     if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
                         results.get(entry.getKey()).add(rec);
                         subscriptions.position(entry.getKey(), rec.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34e6e5c..fdd9d6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -107,6 +107,8 @@ public class StoreChangelogReader implements ChangelogReader {
                 needsInitializing.remove(partition);
                 needsRestoring.remove(partition);
 
+                final StateRestorer restorer = stateRestorers.get(partition);
+                restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
                 task.reinitializeStateStoresForPartitions(recoverableException.partitions());
             }
             restoreConsumer.seekToBeginning(partitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ae48f57..aa6395b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -156,6 +156,42 @@ public class StoreChangelogReaderTest {
         assertThat(callback.restored.size(), equalTo(messages));
     }
 
+    @Test
+    public void shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() {
+        final int messages = 10;
+        final int startOffset = 5;
+        final long expiredCheckpoint = 1L;
+        assignPartition(messages, topicPartition);
+        consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) startOffset));
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, (long) (messages + startOffset)));
+
+        addRecords(messages, topicPartition, startOffset);
+        consumer.assign(Collections.<TopicPartition>emptyList());
+
+        final StateRestorer stateRestorer = new StateRestorer(
+                topicPartition,
+                restoreListener,
+                expiredCheckpoint,
+                Long.MAX_VALUE,
+                true,
+                "storeName");
+        changelogReader.register(stateRestorer);
+
+        EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        EasyMock.replay(active, task);
+
+        // first restore call "fails" since OffsetOutOfRangeException but we should not die with an exception
+        assertEquals(0, changelogReader.restore(active).size());
+        //the starting offset for stateRestorer is set to NO_CHECKPOINT
+        assertThat(stateRestorer.checkpoint(), equalTo(-1L));
+
+        //restore the active task again
+        changelogReader.register(stateRestorer);
+        //the restored task should return completed partition without Exception.
+        assertEquals(1, changelogReader.restore(active).size());
+        //the restored size should be equal to message length.
+        assertThat(callback.restored.size(), equalTo(messages));
+    }
 
     @Test
     public void shouldRestoreMessagesFromCheckpoint() {