You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/11 09:17:40 UTC
[kafka] branch 2.0 updated: KAFKA-7443: OffsetOutOfRangeException
in restoring state store from changelog topic when start offset of local
checkpoint is smaller than that of changelog topic (#5946)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 071d6de KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
071d6de is described below
commit 071d6de3ed1a909fc8a85d801cbda41fd70c14f3
Author: linyli001 <45...@users.noreply.github.com>
AuthorDate: Tue Dec 11 16:40:18 2018 +0800
KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
Reviewer: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../kafka/clients/consumer/MockConsumer.java | 4 +++
.../processor/internals/StoreChangelogReader.java | 2 ++
.../internals/StoreChangelogReaderTest.java | 36 ++++++++++++++++++++++
3 files changed, 42 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index cf1b07f..6f455e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -191,6 +191,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
if (!subscriptions.isPaused(entry.getKey())) {
final List<ConsumerRecord<K, V>> recs = entry.getValue();
for (final ConsumerRecord<K, V> rec : recs) {
+ if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
+ throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey())));
+ }
+
if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
results.get(entry.getKey()).add(rec);
subscriptions.position(entry.getKey(), rec.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 9185920..8729b84 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -108,6 +108,8 @@ public class StoreChangelogReader implements ChangelogReader {
needsInitializing.remove(partition);
needsRestoring.remove(partition);
+ final StateRestorer restorer = stateRestorers.get(partition);
+ restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
}
restoreConsumer.seekToBeginning(partitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ae48f57..aa6395b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -156,6 +156,42 @@ public class StoreChangelogReaderTest {
assertThat(callback.restored.size(), equalTo(messages));
}
+ @Test
+ public void shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() {
+ final int messages = 10;
+ final int startOffset = 5;
+ final long expiredCheckpoint = 1L;
+ assignPartition(messages, topicPartition);
+ consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) startOffset));
+ consumer.updateEndOffsets(Collections.singletonMap(topicPartition, (long) (messages + startOffset)));
+
+ addRecords(messages, topicPartition, startOffset);
+ consumer.assign(Collections.<TopicPartition>emptyList());
+
+ final StateRestorer stateRestorer = new StateRestorer(
+ topicPartition,
+ restoreListener,
+ expiredCheckpoint,
+ Long.MAX_VALUE,
+ true,
+ "storeName");
+ changelogReader.register(stateRestorer);
+
+ EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ EasyMock.replay(active, task);
+
+ // first restore call "fails" since OffsetOutOfRangeException but we should not die with an exception
+ assertEquals(0, changelogReader.restore(active).size());
+ //the starting offset for stateRestorer is set to NO_CHECKPOINT
+ assertThat(stateRestorer.checkpoint(), equalTo(-1L));
+
+ //restore the active task again
+ changelogReader.register(stateRestorer);
+ //the restored task should return completed partition without Exception.
+ assertEquals(1, changelogReader.restore(active).size());
+ //the restored size should be equal to message length.
+ assertThat(callback.restored.size(), equalTo(messages));
+ }
@Test
public void shouldRestoreMessagesFromCheckpoint() {