You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/02 18:41:20 UTC
[kafka] branch 1.0 updated: KAFKA-6269: KTable restore fails after
rebalance (#4300)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 357f3ed KAFKA-6269: KTable restore fails after rebalance (#4300)
357f3ed is described below
commit 357f3edcfa1a436cc846ee95b38671801ed96f1d
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Tue Jan 2 12:42:40 2018 -0500
KAFKA-6269: KTable restore fails after rebalance (#4300)
* Return offset of next record of records left after restore completed
* Changed check for restoring partition to remove the "+1" in the guard condition
Matthias J. Sax <mj...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/clients/consumer/MockConsumer.java | 2 +-
.../processor/internals/StoreChangelogReader.java | 29 ++-
.../KTableSourceTopicRestartIntegrationTest.java | 260 +++++++++++++++++++++
.../internals/StoreChangelogReaderTest.java | 146 ++++++++++--
4 files changed, 410 insertions(+), 27 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9b0c058..246c4d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -189,7 +189,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
List<ConsumerRecord<K, V>> recs = this.records.get(tp);
if (recs == null) {
- recs = new ArrayList<ConsumerRecord<K, V>>();
+ recs = new ArrayList<>();
this.records.put(tp, recs);
}
recs.add(record);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index bbe570c..950a2c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -173,8 +173,8 @@ public class StoreChangelogReader implements ChangelogReader {
for (final StateRestorer restorer : needsPositionUpdate) {
final long position = consumer.position(restorer.partition());
logRestoreOffsets(restorer.partition(),
- position,
- endOffsets.get(restorer.partition()));
+ position,
+ endOffsets.get(restorer.partition()));
restorer.setStartingOffset(position);
restorer.restoreStarted();
}
@@ -238,7 +238,7 @@ public class StoreChangelogReader implements ChangelogReader {
final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
restorer.setRestoredOffset(pos);
if (restorer.hasCompleted(pos, endOffset)) {
- if (pos > endOffset + 1) {
+ if (pos > endOffset) {
throw new TaskMigratedException(task, topicPartition, endOffset, pos);
}
@@ -257,30 +257,39 @@ public class StoreChangelogReader implements ChangelogReader {
final StateRestorer restorer,
final Long endOffset) {
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
- long offset = -1;
-
+ long nextPosition = -1;
+ int numberRecords = records.size();
+ int numberRestored = 0;
for (final ConsumerRecord<byte[], byte[]> record : records) {
- offset = record.offset();
+ final long offset = record.offset();
if (restorer.hasCompleted(offset, endOffset)) {
+ nextPosition = record.offset();
break;
}
+ numberRestored++;
if (record.key() != null) {
restoreRecords.add(KeyValue.pair(record.key(), record.value()));
}
}
- if (offset == -1) {
- offset = consumer.position(restorer.partition());
+ // if we have changelog topic then we should have restored all records in the list
+ // otherwise if we did not fully restore to that point we need to set nextPosition
+ // to the position of the restoreConsumer and we'll cause a TaskMigratedException exception
+ if (nextPosition == -1 || (restorer.offsetLimit() == Long.MAX_VALUE && numberRecords != numberRestored)) {
+ nextPosition = consumer.position(restorer.partition());
}
if (!restoreRecords.isEmpty()) {
restorer.restore(restoreRecords);
- restorer.restoreBatchCompleted(offset + 1, records.size());
+ restorer.restoreBatchCompleted(nextPosition, records.size());
+
}
- return consumer.position(restorer.partition());
+ return nextPosition;
}
+
+
private boolean hasPartition(final TopicPartition topicPartition) {
final List<PartitionInfo> partitions = partitionInfo.get(topicPartition.topic());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
new file mode 100644
index 0000000..4942b21
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+@Category({IntegrationTest.class})
+public class KTableSourceTopicRestartIntegrationTest {
+
+
+ private static final int NUM_BROKERS = 3;
+ private static final String SOURCE_TOPIC = "source-topic";
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ private final Time time = CLUSTER.time;
+ private KafkaStreams streamsOne;
+ private final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ private final Map<String, String> readKeyValues = new ConcurrentHashMap<>();
+
+ private static final Properties PRODUCER_CONFIG = new Properties();
+ private static final Properties STREAMS_CONFIG = new Properties();
+ private Map<String, String> expectedInitialResultsMap;
+ private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
+
+
+ @BeforeClass
+ public static void setUpBeforeAllTests() throws Exception {
+
+ CLUSTER.createTopic(SOURCE_TOPIC);
+
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-restore-from-source");
+ STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+ STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
+ STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+
+ PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+ PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ }
+
+ @Before
+ public void before() {
+
+ final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC);
+ kTable.toStream().foreach(new ForeachAction<String, String>() {
+ @Override
+ public void apply(final String key, final String value) {
+ readKeyValues.put(key, value);
+ }
+ });
+
+ expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c");
+ expectedResultsWithDataWrittenDuringRestoreMap = createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
+ }
+
+ @After
+ public void after() throws IOException {
+ IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+ }
+
+
+ @Test
+ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
+
+ try {
+ streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+ streamsOne.start();
+
+ produceKeyValues("a", "b", "c");
+
+ assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+ streamsOne.close();
+ streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+ // the state restore listener will append one record to the log
+ streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
+ streamsOne.start();
+
+ produceKeyValues("f", "g", "h");
+
+ assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
+
+ } finally {
+ streamsOne.close(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
+
+ try {
+ STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+ streamsOne.start();
+
+ produceKeyValues("a", "b", "c");
+
+ assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+ streamsOne.close();
+ streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+ // the state restore listener will append one record to the log
+ streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
+ streamsOne.start();
+
+ produceKeyValues("f", "g", "h");
+
+ assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
+
+ } finally {
+ streamsOne.close(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
+
+ try {
+ streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+ streamsOne.start();
+
+ produceKeyValues("a", "b", "c");
+
+ assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+ streamsOne.close();
+ streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+ streamsOne.start();
+
+ produceKeyValues("f", "g", "h");
+
+ final Map<String, String> expectedValues = createExpectedResultsMap("a", "b", "c", "f", "g", "h");
+
+ assertNumberValuesRead(readKeyValues, expectedValues, "Table did not get all values after restart");
+
+ } finally {
+ streamsOne.close(5, TimeUnit.SECONDS);
+ }
+ }
+
+ private void assertNumberValuesRead(final Map<String, String> valueMap,
+ final Map<String, String> expectedMap,
+ final String errorMessage) throws InterruptedException {
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return valueMap.equals(expectedMap);
+ }
+ }, errorMessage);
+
+ }
+
+ private void produceKeyValues(final String... keys) throws ExecutionException, InterruptedException {
+ final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
+
+ for (final String key : keys) {
+ keyValueList.add(new KeyValue<>(key, key + "1"));
+ }
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(SOURCE_TOPIC,
+ keyValueList,
+ PRODUCER_CONFIG,
+ time);
+ }
+
+ private Map<String, String> createExpectedResultsMap(final String... keys) {
+ final Map<String, String> expectedMap = new HashMap<>();
+ for (final String key : keys) {
+ expectedMap.put(key, key + "1");
+ }
+ return expectedMap;
+ }
+
+ private class UpdatingSourceTopicOnRestoreStartStateRestoreListener implements StateRestoreListener {
+
+ @Override
+ public void onRestoreStart(final TopicPartition topicPartition,
+ final String storeName,
+ final long startingOffset,
+ final long endingOffset) {
+ try {
+ produceKeyValues("d");
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onBatchRestored(final TopicPartition topicPartition,
+ final String storeName,
+ final long batchEndOffset,
+ final long numRestored) {
+ }
+
+ @Override
+ public void onRestoreEnd(final TopicPartition topicPartition,
+ final String storeName,
+ final long totalRestored) {
+ }
+ }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 3c54851..9401029 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -86,8 +86,7 @@ public class StoreChangelogReaderTest {
};
final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
- "storeName"));
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
changelogReader.restore(active);
assertTrue(functionCalled.get());
}
@@ -107,8 +106,7 @@ public class StoreChangelogReaderTest {
public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
final int messages = 10;
setupConsumer(messages, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
- "storeName"));
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(messages));
}
@@ -257,8 +255,7 @@ public class StoreChangelogReaderTest {
@Test
public void shouldReturnRestoredOffsetsForPersistentStores() {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
- "storeName"));
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
changelogReader.restore(active);
final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
@@ -267,8 +264,7 @@ public class StoreChangelogReaderTest {
@Test
public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
- "storeName"));
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));
changelogReader.restore(active);
final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
@@ -332,12 +328,11 @@ public class StoreChangelogReaderTest {
}
@Test
- public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore() {
+ public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() {
final int messages = 10;
setupConsumer(messages, topicPartition);
consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
- "storeName"));
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@@ -348,6 +343,125 @@ public class StoreChangelogReaderTest {
} catch (final TaskMigratedException expected) { /* ignore */ }
}
+ @Test
+ public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
+ final int totalMessages = 10;
+ assignPartition(totalMessages, topicPartition);
+ // records 0..4
+ addRecords(5, topicPartition, 0);
+ //EOS enabled commit marker at offset 5 so rest of records 6..10
+ addRecords(5, topicPartition, 6);
+ consumer.assign(Collections.<TopicPartition>emptyList());
+
+ // end offsets should start after commit marker of 5 from above
+ consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L));
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+ try {
+ changelogReader.restore(active);
+ fail("Should have thrown task migrated exception");
+ } catch (final TaskMigratedException expected) {
+ /* ignore */
+ }
+ }
+
+ @Test
+ public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
+ final int totalMessages = 10;
+ setupConsumer(totalMessages, topicPartition);
+ // records have offsets of 0..9 10 is commit marker so 11 is end offset
+ consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));
+
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+
+ changelogReader.restore(active);
+ assertThat(callback.restored.size(), equalTo(10));
+ }
+
+
+ @Test
+ public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled() {
+ final int totalMessages = 10;
+ setupConsumer(totalMessages, topicPartition);
+
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+
+ changelogReader.restore(active);
+ assertThat(callback.restored.size(), equalTo(10));
+ }
+
+ @Test
+ public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic() {
+ final int messages = 10;
+ setupConsumer(messages, topicPartition);
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+
+ changelogReader.restore(active);
+ assertThat(callback.restored.size(), equalTo(5));
+ }
+
+ @Test
+ public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopic() {
+ final int messages = 10;
+ setupConsumer(messages, topicPartition);
+
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 10, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+
+ changelogReader.restore(active);
+ assertThat(callback.restored.size(), equalTo(10));
+ }
+
+ @Test
+ public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopicEOSEnabled() {
+ final int totalMessages = 10;
+ assignPartition(totalMessages, topicPartition);
+ // records 0..4 last offset before commit is 4
+ addRecords(5, topicPartition, 0);
+ //EOS enabled so commit marker at offset 5 so records start at 6
+ addRecords(5, topicPartition, 6);
+ consumer.assign(Collections.<TopicPartition>emptyList());
+ // commit marker is 5 so ending offset is 12
+ consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 12L));
+
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 6, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+
+ changelogReader.restore(active);
+ assertThat(callback.restored.size(), equalTo(5));
+ }
+
+ @Test
+ public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopicEOSEnabled() {
+ final int totalMessages = 10;
+ setupConsumer(totalMessages, topicPartition);
+ // records have offsets 0..9 10 is commit marker so 11 is ending offset
+ consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));
+
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 11, true, "storeName"));
+
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+ replay(active);
+
+ changelogReader.restore(active);
+ assertThat(callback.restored.size(), equalTo(10));
+ }
+
private void setupConsumer(final long messages,
final TopicPartition topicPartition) {
assignPartition(messages, topicPartition);
@@ -367,11 +481,11 @@ public class StoreChangelogReaderTest {
final TopicPartition topicPartition) {
consumer.updatePartitions(topicPartition.topic(),
Collections.singletonList(
- new PartitionInfo(topicPartition.topic(),
- topicPartition.partition(),
- null,
- null,
- null)));
+ new PartitionInfo(topicPartition.topic(),
+ topicPartition.partition(),
+ null,
+ null,
+ null)));
consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages)));
consumer.assign(Collections.singletonList(topicPartition));
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].