You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/02 18:41:20 UTC

[kafka] branch 1.0 updated: KAFKA-6269: KTable restore fails after rebalance (#4300)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 357f3ed  KAFKA-6269: KTable restore fails after rebalance (#4300)
357f3ed is described below

commit 357f3edcfa1a436cc846ee95b38671801ed96f1d
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Jan 2 12:42:40 2018 -0500

    KAFKA-6269: KTable restore fails after rebalance (#4300)
    
    * Return offset of next record of records left after restore completed
    * Changed check for restoring partition to remove the "+1" in the guard condition
    
    Matthias J. Sax <mj...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/consumer/MockConsumer.java       |   2 +-
 .../processor/internals/StoreChangelogReader.java  |  29 ++-
 .../KTableSourceTopicRestartIntegrationTest.java   | 260 +++++++++++++++++++++
 .../internals/StoreChangelogReaderTest.java        | 146 ++++++++++--
 4 files changed, 410 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9b0c058..246c4d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -189,7 +189,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
         List<ConsumerRecord<K, V>> recs = this.records.get(tp);
         if (recs == null) {
-            recs = new ArrayList<ConsumerRecord<K, V>>();
+            recs = new ArrayList<>();
             this.records.put(tp, recs);
         }
         recs.add(record);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index bbe570c..950a2c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -173,8 +173,8 @@ public class StoreChangelogReader implements ChangelogReader {
         for (final StateRestorer restorer : needsPositionUpdate) {
             final long position = consumer.position(restorer.partition());
             logRestoreOffsets(restorer.partition(),
-                    position,
-                    endOffsets.get(restorer.partition()));
+                              position,
+                              endOffsets.get(restorer.partition()));
             restorer.setStartingOffset(position);
             restorer.restoreStarted();
         }
@@ -238,7 +238,7 @@ public class StoreChangelogReader implements ChangelogReader {
         final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
         restorer.setRestoredOffset(pos);
         if (restorer.hasCompleted(pos, endOffset)) {
-            if (pos > endOffset + 1) {
+            if (pos > endOffset) {
                 throw new TaskMigratedException(task, topicPartition, endOffset, pos);
             }
 
@@ -257,30 +257,39 @@ public class StoreChangelogReader implements ChangelogReader {
                              final StateRestorer restorer,
                              final Long endOffset) {
         final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
-        long offset = -1;
-
+        long nextPosition = -1;
+        int numberRecords = records.size();
+        int numberRestored = 0;
         for (final ConsumerRecord<byte[], byte[]> record : records) {
-            offset = record.offset();
+            final long offset = record.offset();
             if (restorer.hasCompleted(offset, endOffset)) {
+                nextPosition = record.offset();
                 break;
             }
+            numberRestored++;
             if (record.key() != null) {
                 restoreRecords.add(KeyValue.pair(record.key(), record.value()));
             }
         }
 
-        if (offset == -1) {
-            offset = consumer.position(restorer.partition());
+        // if we have changelog topic then we should have restored all records in the list
+        // otherwise if we did not fully restore to that point we need to set nextPosition
+        // to the position of the restoreConsumer and we'll cause a TaskMigratedException exception
+        if (nextPosition == -1 || (restorer.offsetLimit() == Long.MAX_VALUE && numberRecords != numberRestored)) {
+            nextPosition = consumer.position(restorer.partition());
         }
 
         if (!restoreRecords.isEmpty()) {
             restorer.restore(restoreRecords);
-            restorer.restoreBatchCompleted(offset + 1, records.size());
+            restorer.restoreBatchCompleted(nextPosition, records.size());
+
         }
 
-        return consumer.position(restorer.partition());
+        return nextPosition;
     }
 
+
+
     private boolean hasPartition(final TopicPartition topicPartition) {
         final List<PartitionInfo> partitions = partitionInfo.get(topicPartition.topic());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
new file mode 100644
index 0000000..4942b21
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+@Category({IntegrationTest.class})
+public class KTableSourceTopicRestartIntegrationTest {
+
+
+    private static final int NUM_BROKERS = 3;
+    private static final String SOURCE_TOPIC = "source-topic";
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final Time time = CLUSTER.time;
+    private KafkaStreams streamsOne;
+    private final StreamsBuilder streamsBuilder = new StreamsBuilder();
+    private final Map<String, String> readKeyValues = new ConcurrentHashMap<>();
+
+    private static final Properties PRODUCER_CONFIG = new Properties();
+    private static final Properties STREAMS_CONFIG = new Properties();
+    private Map<String, String> expectedInitialResultsMap;
+    private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
+
+
+    @BeforeClass
+    public static void setUpBeforeAllTests() throws Exception {
+
+        CLUSTER.createTopic(SOURCE_TOPIC);
+
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-restore-from-source");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+
+        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+    }
+
+    @Before
+    public void before() {
+
+        final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC);
+        kTable.toStream().foreach(new ForeachAction<String, String>() {
+            @Override
+            public void apply(final String key, final String value) {
+                readKeyValues.put(key, value);
+            }
+        });
+
+        expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c");
+        expectedResultsWithDataWrittenDuringRestoreMap = createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
+    }
+
+    @After
+    public void after() throws IOException {
+        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+    }
+
+
+    @Test
+    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
+
+        try {
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streamsOne.start();
+
+            produceKeyValues("a", "b", "c");
+
+            assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+            streamsOne.close();
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            // the state restore listener will append one record to the log
+            streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
+            streamsOne.start();
+
+            produceKeyValues("f", "g", "h");
+
+            assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
+
+        } finally {
+            streamsOne.close(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
+
+        try {
+            STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streamsOne.start();
+
+            produceKeyValues("a", "b", "c");
+
+            assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+            streamsOne.close();
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            // the state restore listener will append one record to the log
+            streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
+            streamsOne.start();
+
+            produceKeyValues("f", "g", "h");
+
+            assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
+
+        } finally {
+            streamsOne.close(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
+
+        try {
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streamsOne.start();
+
+            produceKeyValues("a", "b", "c");
+
+            assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+            streamsOne.close();
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streamsOne.start();
+
+            produceKeyValues("f", "g", "h");
+
+            final Map<String, String> expectedValues = createExpectedResultsMap("a", "b", "c", "f", "g", "h");
+
+            assertNumberValuesRead(readKeyValues, expectedValues, "Table did not get all values after restart");
+
+        } finally {
+            streamsOne.close(5, TimeUnit.SECONDS);
+        }
+    }
+
+    private void assertNumberValuesRead(final Map<String, String> valueMap,
+                                        final Map<String, String> expectedMap,
+                                        final String errorMessage) throws InterruptedException {
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return valueMap.equals(expectedMap);
+            }
+        }, errorMessage);
+
+    }
+
+    private void produceKeyValues(final String... keys) throws ExecutionException, InterruptedException {
+        final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
+
+        for (final String key : keys) {
+            keyValueList.add(new KeyValue<>(key, key + "1"));
+        }
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(SOURCE_TOPIC,
+                                                           keyValueList,
+                                                           PRODUCER_CONFIG,
+                                                           time);
+    }
+
+    private Map<String, String> createExpectedResultsMap(final String... keys) {
+        final Map<String, String> expectedMap = new HashMap<>();
+        for (final String key : keys) {
+            expectedMap.put(key, key + "1");
+        }
+        return expectedMap;
+    }
+
+    private class UpdatingSourceTopicOnRestoreStartStateRestoreListener implements StateRestoreListener {
+
+        @Override
+        public void onRestoreStart(final TopicPartition topicPartition,
+                                   final String storeName,
+                                   final long startingOffset,
+                                   final long endingOffset) {
+            try {
+                produceKeyValues("d");
+            } catch (ExecutionException | InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void onBatchRestored(final TopicPartition topicPartition,
+                                    final String storeName,
+                                    final long batchEndOffset,
+                                    final long numRestored) {
+        }
+
+        @Override
+        public void onRestoreEnd(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long totalRestored) {
+        }
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 3c54851..9401029 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -86,8 +86,7 @@ public class StoreChangelogReaderTest {
         };
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         assertTrue(functionCalled.get());
     }
@@ -107,8 +106,7 @@ public class StoreChangelogReaderTest {
     public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(messages));
     }
@@ -257,8 +255,7 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
@@ -267,8 +264,7 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
@@ -332,12 +328,11 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore() {
+    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-            "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -348,6 +343,125 @@ public class StoreChangelogReaderTest {
         } catch (final TaskMigratedException expected) { /* ignore */ }
     }
 
+    @Test
+    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
+        final int totalMessages = 10;
+        assignPartition(totalMessages, topicPartition);
+        // records 0..4
+        addRecords(5, topicPartition, 0);
+        //EOS enabled commit marker at offset 5 so rest of records 6..10
+        addRecords(5, topicPartition, 6);
+        consumer.assign(Collections.<TopicPartition>emptyList());
+
+        // end offsets should start after commit marker of 5 from above
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+        try {
+            changelogReader.restore(active);
+            fail("Should have thrown task migrated exception");
+        } catch (final TaskMigratedException expected) {
+            /* ignore */
+        }
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
+        final int totalMessages = 10;
+        setupConsumer(totalMessages, topicPartition);
+        // records have offsets of 0..9 10 is commit marker so 11 is end offset
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled() {
+        final int totalMessages = 10;
+        setupConsumer(totalMessages, topicPartition);
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(5));
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopic() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 10, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopicEOSEnabled() {
+        final int totalMessages = 10;
+        assignPartition(totalMessages, topicPartition);
+        // records 0..4 last offset before commit is 4
+        addRecords(5, topicPartition, 0);
+        //EOS enabled so commit marker at offset 5 so records start at 6
+        addRecords(5, topicPartition, 6);
+        consumer.assign(Collections.<TopicPartition>emptyList());
+        // commit marker is 5 so ending offset is 12
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 12L));
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 6, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(5));
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopicEOSEnabled() {
+        final int totalMessages = 10;
+        setupConsumer(totalMessages, topicPartition);
+        // records have offsets 0..9 10 is commit marker so 11 is ending offset
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 11, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
     private void setupConsumer(final long messages,
                                final TopicPartition topicPartition) {
         assignPartition(messages, topicPartition);
@@ -367,11 +481,11 @@ public class StoreChangelogReaderTest {
                                  final TopicPartition topicPartition) {
         consumer.updatePartitions(topicPartition.topic(),
                                   Collections.singletonList(
-                                          new PartitionInfo(topicPartition.topic(),
-                                                            topicPartition.partition(),
-                                                            null,
-                                                            null,
-                                                            null)));
+                                      new PartitionInfo(topicPartition.topic(),
+                                                        topicPartition.partition(),
+                                                        null,
+                                                        null,
+                                                        null)));
         consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages)));
         consumer.assign(Collections.singletonList(topicPartition));

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].