You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/11 09:27:37 UTC
[kafka] branch 1.1 updated: KAFKA-7443: OffsetOutOfRangeException
in restoring state store from changelog topic when start offset of local
checkpoint is smaller than that of changelog topic (#5946)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 4cdbb3e KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
4cdbb3e is described below
commit 4cdbb3e5c19142d118f0f3999dd3e21deccb3643
Author: linyli001 <45...@users.noreply.github.com>
AuthorDate: Tue Dec 11 16:40:18 2018 +0800
KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946)
Reviewer: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../kafka/clients/consumer/MockConsumer.java | 4 +++
.../processor/internals/StoreChangelogReader.java | 3 ++
.../internals/StoreChangelogReaderTest.java | 36 ++++++++++++++++++++++
3 files changed, 43 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index ceb7024..07cb415 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -184,6 +184,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
if (!subscriptions.isPaused(entry.getKey())) {
final List<ConsumerRecord<K, V>> recs = entry.getValue();
for (final ConsumerRecord<K, V> rec : recs) {
+ if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) {
+ throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey())));
+ }
+
if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
results.get(entry.getKey()).add(rec);
subscriptions.position(entry.getKey(), rec.offset() + 1);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index e0fc82d..501ddef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -96,6 +96,9 @@ public class StoreChangelogReader implements ChangelogReader {
for (final TopicPartition partition : partitions) {
final StreamTask task = active.restoringTaskFor(partition);
log.info("Reinitializing StreamTask {}", task);
+
+ final StateRestorer restorer = stateRestorers.get(partition);
+ restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
}
restoreConsumer.seekToBeginning(partitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index a117dc3..5391f31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -152,6 +152,42 @@ public class StoreChangelogReaderTest {
assertThat(callback.restored.size(), equalTo(messages));
}
+ @Test
+ public void shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() {
+ final int messages = 10;
+ final int startOffset = 5;
+ final long expiredCheckpoint = 1L;
+ assignPartition(messages, topicPartition);
+ consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) startOffset));
+ consumer.updateEndOffsets(Collections.singletonMap(topicPartition, (long) (messages + startOffset)));
+
+ addRecords(messages, topicPartition, startOffset);
+ consumer.assign(Collections.<TopicPartition>emptyList());
+
+ final StateRestorer stateRestorer = new StateRestorer(
+ topicPartition,
+ restoreListener,
+ expiredCheckpoint,
+ Long.MAX_VALUE,
+ true,
+ "storeName");
+ changelogReader.register(stateRestorer);
+
+ EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ EasyMock.replay(active, task);
+
+ // first restore call "fails" since OffsetOutOfRangeException but we should not die with an exception
+ assertEquals(0, changelogReader.restore(active).size());
+ //the starting offset for stateRestorer is set to NO_CHECKPOINT
+ assertThat(stateRestorer.checkpoint(), equalTo(-1L));
+
+ //restore the active task again
+ changelogReader.register(stateRestorer);
+ //the restored task should return completed partition without Exception.
+ assertEquals(1, changelogReader.restore(active).size());
+ //the restored size should be equal to message length.
+ assertThat(callback.restored.size(), equalTo(messages));
+ }
@Test
public void shouldRestoreMessagesFromCheckpoint() {