You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/11 09:17:40 UTC

[kafka] branch 2.0 updated: KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 071d6de  KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
071d6de is described below

commit 071d6de3ed1a909fc8a85d801cbda41fd70c14f3
Author: linyli001 <45...@users.noreply.github.com>
AuthorDate: Tue Dec 11 16:40:18 2018 +0800

    KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../kafka/clients/consumer/MockConsumer.java       |  4 +++
 .../processor/internals/StoreChangelogReader.java  |  2 ++
 .../internals/StoreChangelogReaderTest.java        | 36 ++++++++++++++++++++++
 3 files changed, 42 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index cf1b07f..6f455e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -191,6 +191,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             if (!subscriptions.isPaused(entry.getKey())) {
                 final List<ConsumerRecord<K, V>> recs = entry.getValue();
                 for (final ConsumerRecord<K, V> rec : recs) {
+                    if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
+                        throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey())));
+                    }
+
                     if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
                         results.get(entry.getKey()).add(rec);
                         subscriptions.position(entry.getKey(), rec.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 9185920..8729b84 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -108,6 +108,8 @@ public class StoreChangelogReader implements ChangelogReader {
                 needsInitializing.remove(partition);
                 needsRestoring.remove(partition);
 
+                final StateRestorer restorer = stateRestorers.get(partition);
+                restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
                 task.reinitializeStateStoresForPartitions(recoverableException.partitions());
             }
             restoreConsumer.seekToBeginning(partitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ae48f57..aa6395b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -156,6 +156,42 @@ public class StoreChangelogReaderTest {
         assertThat(callback.restored.size(), equalTo(messages));
     }
 
+    @Test
+    public void shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() {
+        final int messages = 10;
+        final int startOffset = 5;
+        final long expiredCheckpoint = 1L;
+        assignPartition(messages, topicPartition);
+        consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) startOffset));
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, (long) (messages + startOffset)));
+
+        addRecords(messages, topicPartition, startOffset);
+        consumer.assign(Collections.<TopicPartition>emptyList());
+
+        final StateRestorer stateRestorer = new StateRestorer(
+                topicPartition,
+                restoreListener,
+                expiredCheckpoint,
+                Long.MAX_VALUE,
+                true,
+                "storeName");
+        changelogReader.register(stateRestorer);
+
+        EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        EasyMock.replay(active, task);
+
+        // first restore call "fails" since OffsetOutOfRangeException but we should not die with an exception
+        assertEquals(0, changelogReader.restore(active).size());
+        //the starting offset for stateRestorer is set to NO_CHECKPOINT
+        assertThat(stateRestorer.checkpoint(), equalTo(-1L));
+
+        //restore the active task again
+        changelogReader.register(stateRestorer);
+        //the restored task should return completed partition without Exception.
+        assertEquals(1, changelogReader.restore(active).size());
+        //the restored size should be equal to message length.
+        assertThat(callback.restored.size(), equalTo(messages));
+    }
 
     @Test
     public void shouldRestoreMessagesFromCheckpoint() {