You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/14 20:44:02 UTC
[kafka] branch trunk updated: KAFKA-6364: Second check for ensuring
changelog topic not changed during restore (#4511)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27b56b1 KAFKA-6364: Second check for ensuring changelog topic not changed during restore (#4511)
27b56b1 is described below
commit 27b56b1458d0af0a397e9f47ecd008c10c088c6d
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Feb 14 15:43:57 2018 -0500
KAFKA-6364: Second check for ensuring changelog topic not changed during restore (#4511)
Added a second check for race condition where store changelog topic updated during restore, but not if a KTable changelog topic. This will be tricky to test, but I wanted to push the PR to get feedback on the approach.
Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
.../kafka/clients/consumer/MockConsumer.java | 35 +++++++++++++++---
.../processor/internals/StoreChangelogReader.java | 9 +++++
.../internals/StoreChangelogReaderTest.java | 43 +++++++++++++++++++++-
3 files changed, 81 insertions(+), 6 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c492995..ceb7024 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -51,7 +51,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final Map<String, List<PartitionInfo>> partitions;
private final SubscriptionState subscriptions;
private final Map<TopicPartition, Long> beginningOffsets;
- private final Map<TopicPartition, Long> endOffsets;
+ private final Map<TopicPartition, List<Long>> endOffsets;
private final Map<TopicPartition, OffsetAndMetadata> committed;
private final Queue<Runnable> pollTasks;
private final Set<TopicPartition> paused;
@@ -290,8 +290,26 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
}
- public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) {
- endOffsets.putAll(newOffsets);
+ // needed for cases where you make a second call to endOffsets
+ public synchronized void addEndOffsets(final Map<TopicPartition, Long> newOffsets) {
+ innerUpdateEndOffsets(newOffsets, false);
+ }
+
+ public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
+ innerUpdateEndOffsets(newOffsets, true);
+ }
+
+ private void innerUpdateEndOffsets(final Map<TopicPartition, Long> newOffsets,
+ final boolean replace) {
+
+ for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) {
+ List<Long> offsets = endOffsets.get(entry.getKey());
+ if (replace || offsets == null) {
+ offsets = new ArrayList<>();
+ }
+ offsets.add(entry.getValue());
+ endOffsets.put(entry.getKey(), offsets);
+ }
}
@Override
@@ -354,7 +372,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
Map<TopicPartition, Long> result = new HashMap<>();
for (TopicPartition tp : partitions) {
- Long endOffset = endOffsets.get(tp);
+ Long endOffset = getEndOffset(endOffsets.get(tp));
if (endOffset == null)
throw new IllegalStateException("The partition " + tp + " does not have an end offset.");
result.put(tp, endOffset);
@@ -430,7 +448,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning");
} else if (strategy == OffsetResetStrategy.LATEST) {
- offset = endOffsets.get(tp);
+ offset = getEndOffset(endOffsets.get(tp));
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
} else {
@@ -438,4 +456,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
seek(tp, offset);
}
+
+ private Long getEndOffset(List<Long> offsets) {
+ if (offsets == null || offsets.isEmpty()) {
+ return null;
+ }
+ return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index b11c45b..5fcba76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -255,6 +255,15 @@ public class StoreChangelogReader implements ChangelogReader {
throw new TaskMigratedException(task, topicPartition, endOffset, pos);
}
+ // need to check for changelog topic
+ if (restorer.offsetLimit() == Long.MAX_VALUE) {
+ final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
+ if (!restorer.hasCompleted(pos, updatedEndOffset)) {
+ throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos);
+ }
+ }
+
+
log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
topicPartition,
restorer.restoredNumRecords(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e69cede..c65d4ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -390,9 +390,50 @@ public class StoreChangelogReaderTest {
try {
changelogReader.restore(active);
fail("Should have thrown TaskMigratedException");
- } catch (final TaskMigratedException expected) { /* ignore */ }
+ } catch (final TaskMigratedException expected) {
+ /* ignore */
+ }
}
+
+ @Test
+ public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() {
+ final int messages = 10;
+ setupConsumer(messages, topicPartition);
+ // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic
+ // so a subsequent call to endOffsets returns a value exceeding the expected end value
+ consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+
+ try {
+ changelogReader.restore(active);
+ fail("Should have thrown TaskMigratedException");
+ } catch (final TaskMigratedException expected) {
+ // verifies second block threw exception with updated end offset
+ assertTrue(expected.getMessage().contains("end offset 15, current offset 10"));
+ }
+ }
+
+
+ @Test
+ public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
+ final int messages = 10;
+ setupConsumer(messages, topicPartition);
+ // in this case first call to endOffsets returns correct value, but a second thread has updated the source topic
+ // but since it's a source topic, the second check should not fire hence no exception
+ consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 9L, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+
+ changelogReader.restore(active);
+ }
+
+
@Test
public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
final int totalMessages = 10;
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.