You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/14 20:44:02 UTC

[kafka] branch trunk updated: KAFKA-6364: Second check for ensuring changelog topic not changed during restore (#4511)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27b56b1  KAFKA-6364: Second check for ensuring changelog topic not changed during restore (#4511)
27b56b1 is described below

commit 27b56b1458d0af0a397e9f47ecd008c10c088c6d
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Feb 14 15:43:57 2018 -0500

    KAFKA-6364: Second check for ensuring changelog topic not changed during restore (#4511)
    
    Added a second check for race condition where store changelog topic updated during restore, but not if a KTable changelog topic. This will be tricky to test, but I wanted to push the PR to get feedback on the approach.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
 .../kafka/clients/consumer/MockConsumer.java       | 35 +++++++++++++++---
 .../processor/internals/StoreChangelogReader.java  |  9 +++++
 .../internals/StoreChangelogReaderTest.java        | 43 +++++++++++++++++++++-
 3 files changed, 81 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c492995..ceb7024 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -51,7 +51,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private final Map<String, List<PartitionInfo>> partitions;
     private final SubscriptionState subscriptions;
     private final Map<TopicPartition, Long> beginningOffsets;
-    private final Map<TopicPartition, Long> endOffsets;
+    private final Map<TopicPartition, List<Long>> endOffsets;
     private final Map<TopicPartition, OffsetAndMetadata> committed;
     private final Queue<Runnable> pollTasks;
     private final Set<TopicPartition> paused;
@@ -290,8 +290,26 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
     }
 
-    public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) {
-        endOffsets.putAll(newOffsets);
+    // needed for cases where you make a second call to endOffsets
+    public synchronized void addEndOffsets(final Map<TopicPartition, Long> newOffsets) {
+        innerUpdateEndOffsets(newOffsets, false);
+    }
+
+    public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
+        innerUpdateEndOffsets(newOffsets, true);
+    }
+
+    private void innerUpdateEndOffsets(final Map<TopicPartition, Long> newOffsets,
+                                       final boolean replace) {
+
+        for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) {
+            List<Long> offsets = endOffsets.get(entry.getKey());
+            if (replace || offsets == null) {
+                offsets = new ArrayList<>();
+            }
+            offsets.add(entry.getValue());
+            endOffsets.put(entry.getKey(), offsets);
+        }
     }
 
     @Override
@@ -354,7 +372,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
         Map<TopicPartition, Long> result = new HashMap<>();
         for (TopicPartition tp : partitions) {
-            Long endOffset = endOffsets.get(tp);
+            Long endOffset = getEndOffset(endOffsets.get(tp));
             if (endOffset == null)
                 throw new IllegalStateException("The partition " + tp + " does not have an end offset.");
             result.put(tp, endOffset);
@@ -430,7 +448,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             if (offset == null)
                 throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning");
         } else if (strategy == OffsetResetStrategy.LATEST) {
-            offset = endOffsets.get(tp);
+            offset = getEndOffset(endOffsets.get(tp));
             if (offset == null)
                 throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
         } else {
@@ -438,4 +456,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         }
         seek(tp, offset);
     }
+
+    private Long getEndOffset(List<Long> offsets) {
+        if (offsets == null || offsets.isEmpty()) {
+            return null;
+        }
+        return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index b11c45b..5fcba76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -255,6 +255,15 @@ public class StoreChangelogReader implements ChangelogReader {
                 throw new TaskMigratedException(task, topicPartition, endOffset, pos);
             }
 
+            // need to check for changelog topic
+            if (restorer.offsetLimit() == Long.MAX_VALUE) {
+                final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
+                if (!restorer.hasCompleted(pos, updatedEndOffset)) {
+                    throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos);
+                }
+            }
+
+
             log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
                       topicPartition,
                       restorer.restoredNumRecords(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e69cede..c65d4ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -390,9 +390,50 @@ public class StoreChangelogReaderTest {
         try {
             changelogReader.restore(active);
             fail("Should have thrown TaskMigratedException");
-        } catch (final TaskMigratedException expected) { /* ignore */ }
+        } catch (final TaskMigratedException expected) {
+            /* ignore */
+        }
     }
 
+
+    @Test
+    public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic
+        // so a subsequent call to endOffsets returns a value exceeding the expected end value
+        consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        try {
+            changelogReader.restore(active);
+            fail("Should have thrown TaskMigratedException");
+        } catch (final TaskMigratedException expected) {
+            // verifies second block threw exception with updated end offset
+            assertTrue(expected.getMessage().contains("end offset 15, current offset 10"));
+        }
+    }
+
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        // in this case first call to endOffsets returns correct value, but a second thread has updated the source topic
+        // but since it's a source topic, the second check should not fire hence no exception
+        consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 9L, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+    }
+
+
     @Test
     public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
         final int totalMessages = 10;

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.