You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/11 08:40:31 UTC
[kafka] branch trunk updated: KAFKA-7443: OffsetOutOfRangeException
in restoring state store from changelog topic when start offset of local
checkpoint is smaller than that of changelog topic (#5946)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a94c8da KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
a94c8da is described below
commit a94c8da5083381d9ca4224b1489e1dc205bf55bb
Author: linyli001 <45...@users.noreply.github.com>
AuthorDate: Tue Dec 11 16:40:18 2018 +0800
KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
Reviewer: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../kafka/clients/consumer/MockConsumer.java | 4 +++
.../processor/internals/StoreChangelogReader.java | 2 ++
.../internals/StoreChangelogReaderTest.java | 36 ++++++++++++++++++++++
3 files changed, 42 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index e43c292..f877f9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -188,6 +188,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
if (!subscriptions.isPaused(entry.getKey())) {
final List<ConsumerRecord<K, V>> recs = entry.getValue();
for (final ConsumerRecord<K, V> rec : recs) {
+ if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
+ throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey())));
+ }
+
if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
subscriptions.position(entry.getKey(), rec.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34e6e5c..fdd9d6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -107,6 +107,8 @@ public class StoreChangelogReader implements ChangelogReader {
needsInitializing.remove(partition);
needsRestoring.remove(partition);
+ final StateRestorer restorer = stateRestorers.get(partition);
+ restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
}
restoreConsumer.seekToBeginning(partitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 34f0a32..d08f0d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -157,6 +157,42 @@ public class StoreChangelogReaderTest {
assertThat(callback.restored.size(), equalTo(messages));
}
+ @Test
+ public void shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() {
+ final int messages = 10;
+ final int startOffset = 5;
+ final long expiredCheckpoint = 1L;
+ assignPartition(messages, topicPartition);
+ consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) startOffset));
+ consumer.updateEndOffsets(Collections.singletonMap(topicPartition, (long) (messages + startOffset)));
+
+ addRecords(messages, topicPartition, startOffset);
+ consumer.assign(Collections.<TopicPartition>emptyList());
+
+ final StateRestorer stateRestorer = new StateRestorer(
+ topicPartition,
+ restoreListener,
+ expiredCheckpoint,
+ Long.MAX_VALUE,
+ true,
+ "storeName");
+ changelogReader.register(stateRestorer);
+
+ EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ EasyMock.replay(active, task);
+
+ // first restore call "fails" since OffsetOutOfRangeException but we should not die with an exception
+ assertEquals(0, changelogReader.restore(active).size());
+ //the starting offset for stateRestorer is set to NO_CHECKPOINT
+ assertThat(stateRestorer.checkpoint(), equalTo(-1L));
+
+ //restore the active task again
+ changelogReader.register(stateRestorer);
+ //the restored task should return completed partition without Exception.
+ assertEquals(1, changelogReader.restore(active).size());
+ //the restored size should be equal to message length.
+ assertThat(callback.restored.size(), equalTo(messages));
+ }
@Test
public void shouldRestoreMessagesFromCheckpoint() {