You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/02 17:43:00 UTC

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

    [ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308416#comment-16308416 ] 

ASF GitHub Bot commented on KAFKA-6269:
---------------------------------------

guozhangwang closed pull request #4300: KAFKA-6269: KTable restore fails after rebalance
URL: https://github.com/apache/kafka/pull/4300
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 10aedbb93c4..d9085b0697f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -197,7 +197,7 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
             throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
         List<ConsumerRecord<K, V>> recs = this.records.get(tp);
         if (recs == null) {
-            recs = new ArrayList<ConsumerRecord<K, V>>();
+            recs = new ArrayList<>();
             this.records.put(tp, recs);
         }
         recs.add(record);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 178d2bb96d0..ba17ce95ede 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -96,7 +96,6 @@ public void register(final StateRestorer restorer) {
             restoreConsumer.seekToBeginning(partitions);
         }
 
-
         if (needsRestoring.isEmpty()) {
             restoreConsumer.unsubscribe();
         }
@@ -174,8 +173,8 @@ private void startRestoration(final Map<TopicPartition, StateRestorer> initializ
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
                 restoreConsumer.seek(restorer.partition(), restorer.checkpoint());
                 logRestoreOffsets(restorer.partition(),
-                        restorer.checkpoint(),
-                        endOffsets.get(restorer.partition()));
+                                  restorer.checkpoint(),
+                                  endOffsets.get(restorer.partition()));
                 restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
                 restorer.restoreStarted();
             } else {
@@ -187,8 +186,8 @@ private void startRestoration(final Map<TopicPartition, StateRestorer> initializ
         for (final StateRestorer restorer : needsPositionUpdate) {
             final long position = restoreConsumer.position(restorer.partition());
             logRestoreOffsets(restorer.partition(),
-                    position,
-                    endOffsets.get(restorer.partition()));
+                              position,
+                              endOffsets.get(restorer.partition()));
             restorer.setStartingOffset(position);
             restorer.restoreStarted();
         }
@@ -252,7 +251,7 @@ private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
         final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
         restorer.setRestoredOffset(pos);
         if (restorer.hasCompleted(pos, endOffset)) {
-            if (pos > endOffset + 1) {
+            if (pos > endOffset) {
                 throw new TaskMigratedException(task, topicPartition, endOffset, pos);
             }
 
@@ -271,30 +270,40 @@ private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
                              final StateRestorer restorer,
                              final Long endOffset) {
         final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
-        long offset = -1;
-
+        long nextPosition = -1;
+        int numberRecords = records.size();
+        int numberRestored = 0;
         for (final ConsumerRecord<byte[], byte[]> record : records) {
-            offset = record.offset();
+            final long offset = record.offset();
             if (restorer.hasCompleted(offset, endOffset)) {
+                nextPosition = record.offset();
                 break;
             }
+            numberRestored++;
             if (record.key() != null) {
                 restoreRecords.add(KeyValue.pair(record.key(), record.value()));
             }
         }
 
-        if (offset == -1) {
-            offset = restoreConsumer.position(restorer.partition());
+
+        // if we have changelog topic then we should have restored all records in the list
+        // otherwise if we did not fully restore to that point we need to set nextPosition
+        // to the position of the restoreConsumer and we'll cause a TaskMigratedException exception
+        if (nextPosition == -1 || (restorer.offsetLimit() == Long.MAX_VALUE && numberRecords != numberRestored)) {
+            nextPosition = restoreConsumer.position(restorer.partition());
         }
 
         if (!restoreRecords.isEmpty()) {
             restorer.restore(restoreRecords);
-            restorer.restoreBatchCompleted(offset + 1, records.size());
+            restorer.restoreBatchCompleted(nextPosition, records.size());
+
         }
 
-        return restoreConsumer.position(restorer.partition());
+        return nextPosition;
     }
 
+
+
     private boolean hasPartition(final TopicPartition topicPartition) {
         final List<PartitionInfo> partitions = partitionInfo.get(topicPartition.topic());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
new file mode 100644
index 00000000000..4942b21266d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+@Category({IntegrationTest.class})
+public class KTableSourceTopicRestartIntegrationTest {
+
+
+    private static final int NUM_BROKERS = 3;
+    private static final String SOURCE_TOPIC = "source-topic";
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final Time time = CLUSTER.time;
+    private KafkaStreams streamsOne;
+    private final StreamsBuilder streamsBuilder = new StreamsBuilder();
+    private final Map<String, String> readKeyValues = new ConcurrentHashMap<>();
+
+    private static final Properties PRODUCER_CONFIG = new Properties();
+    private static final Properties STREAMS_CONFIG = new Properties();
+    private Map<String, String> expectedInitialResultsMap;
+    private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
+
+
+    @BeforeClass
+    public static void setUpBeforeAllTests() throws Exception {
+
+        CLUSTER.createTopic(SOURCE_TOPIC);
+
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-restore-from-source");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+
+        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+    }
+
+    @Before
+    public void before() {
+
+        final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC);
+        kTable.toStream().foreach(new ForeachAction<String, String>() {
+            @Override
+            public void apply(final String key, final String value) {
+                readKeyValues.put(key, value);
+            }
+        });
+
+        expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c");
+        expectedResultsWithDataWrittenDuringRestoreMap = createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
+    }
+
+    @After
+    public void after() throws IOException {
+        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+    }
+
+
+    @Test
+    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
+
+        try {
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streamsOne.start();
+
+            produceKeyValues("a", "b", "c");
+
+            assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+            streamsOne.close();
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            // the state restore listener will append one record to the log
+            streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
+            streamsOne.start();
+
+            produceKeyValues("f", "g", "h");
+
+            assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
+
+        } finally {
+            streamsOne.close(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
+
+        try {
+            STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streamsOne.start();
+
+            produceKeyValues("a", "b", "c");
+
+            assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+            streamsOne.close();
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            // the state restore listener will append one record to the log
+            streamsOne.setGlobalStateRestoreListener(new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
+            streamsOne.start();
+
+            produceKeyValues("f", "g", "h");
+
+            assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
+
+        } finally {
+            streamsOne.close(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
+
+        try {
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streamsOne.start();
+
+            produceKeyValues("a", "b", "c");
+
+            assertNumberValuesRead(readKeyValues, expectedInitialResultsMap, "Table did not read all values");
+
+            streamsOne.close();
+            streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
+            streamsOne.start();
+
+            produceKeyValues("f", "g", "h");
+
+            final Map<String, String> expectedValues = createExpectedResultsMap("a", "b", "c", "f", "g", "h");
+
+            assertNumberValuesRead(readKeyValues, expectedValues, "Table did not get all values after restart");
+
+        } finally {
+            streamsOne.close(5, TimeUnit.SECONDS);
+        }
+    }
+
+    private void assertNumberValuesRead(final Map<String, String> valueMap,
+                                        final Map<String, String> expectedMap,
+                                        final String errorMessage) throws InterruptedException {
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return valueMap.equals(expectedMap);
+            }
+        }, errorMessage);
+
+    }
+
+    private void produceKeyValues(final String... keys) throws ExecutionException, InterruptedException {
+        final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
+
+        for (final String key : keys) {
+            keyValueList.add(new KeyValue<>(key, key + "1"));
+        }
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(SOURCE_TOPIC,
+                                                           keyValueList,
+                                                           PRODUCER_CONFIG,
+                                                           time);
+    }
+
+    private Map<String, String> createExpectedResultsMap(final String... keys) {
+        final Map<String, String> expectedMap = new HashMap<>();
+        for (final String key : keys) {
+            expectedMap.put(key, key + "1");
+        }
+        return expectedMap;
+    }
+
+    private class UpdatingSourceTopicOnRestoreStartStateRestoreListener implements StateRestoreListener {
+
+        @Override
+        public void onRestoreStart(final TopicPartition topicPartition,
+                                   final String storeName,
+                                   final long startingOffset,
+                                   final long endingOffset) {
+            try {
+                produceKeyValues("d");
+            } catch (ExecutionException | InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void onBatchRestored(final TopicPartition topicPartition,
+                                    final String storeName,
+                                    final long batchEndOffset,
+                                    final long numRestored) {
+        }
+
+        @Override
+        public void onRestoreEnd(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long totalRestored) {
+        }
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 9f6f7121f5a..ee964513415 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -91,8 +91,7 @@ public void shouldRequestTopicsAndHandleTimeoutException() {
         };
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         assertTrue(functionCalled.get());
     }
@@ -105,7 +104,6 @@ public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
         EasyMock.replay(mockRestorer);
         changelogReader.register(mockRestorer);
 
-
         consumer.subscribe(Collections.singleton("sometopic"));
 
         try {
@@ -120,8 +118,7 @@ public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
     public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(messages));
     }
@@ -137,7 +134,7 @@ public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
             }
         });
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-            "storeName"));
+                                                   "storeName"));
 
         EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         EasyMock.replay(active);
@@ -294,8 +291,7 @@ public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
     @Test
     public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
@@ -304,8 +300,7 @@ public void shouldReturnRestoredOffsetsForPersistentStores() {
     @Test
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
@@ -369,12 +364,11 @@ public void shouldRestorePartitionsRegisteredPostInitialization() {
     }
 
     @Test
-    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore() {
+    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-            "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -385,6 +379,125 @@ public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore
         } catch (final TaskMigratedException expected) { /* ignore */ }
     }
 
+    @Test
+    public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
+        final int totalMessages = 10;
+        assignPartition(totalMessages, topicPartition);
+        // records 0..4
+        addRecords(5, topicPartition, 0);
+        //EOS enabled commit marker at offset 5 so rest of records 6..10
+        addRecords(5, topicPartition, 6);
+        consumer.assign(Collections.<TopicPartition>emptyList());
+
+        // end offsets should start after commit marker of 5 from above
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+        try {
+            changelogReader.restore(active);
+            fail("Should have thrown task migrated exception");
+        } catch (final TaskMigratedException expected) {
+            /* ignore */
+        }
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
+        final int totalMessages = 10;
+        setupConsumer(totalMessages, topicPartition);
+        // records have offsets of 0..9 10 is commit marker so 11 is end offset
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled() {
+        final int totalMessages = 10;
+        setupConsumer(totalMessages, topicPartition);
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(5));
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopic() {
+        final int messages = 10;
+        setupConsumer(messages, topicPartition);
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 10, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopicEOSEnabled() {
+        final int totalMessages = 10;
+        assignPartition(totalMessages, topicPartition);
+        // records 0..4 last offset before commit is 4
+        addRecords(5, topicPartition, 0);
+        //EOS enabled so commit marker at offset 5 so records start at 6
+        addRecords(5, topicPartition, 6);
+        consumer.assign(Collections.<TopicPartition>emptyList());
+        // commit marker is 5 so ending offset is 12
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 12L));
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 6, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(5));
+    }
+
+    @Test
+    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopicEOSEnabled() {
+        final int totalMessages = 10;
+        setupConsumer(totalMessages, topicPartition);
+        // records have offsets 0..9 10 is commit marker so 11 is ending offset
+        consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 11, true, "storeName"));
+
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        replay(active);
+
+        changelogReader.restore(active);
+        assertThat(callback.restored.size(), equalTo(10));
+    }
+
     private void setupConsumer(final long messages,
                                final TopicPartition topicPartition) {
         assignPartition(messages, topicPartition);
@@ -404,11 +517,11 @@ private void assignPartition(final long messages,
                                  final TopicPartition topicPartition) {
         consumer.updatePartitions(topicPartition.topic(),
                                   Collections.singletonList(
-                                          new PartitionInfo(topicPartition.topic(),
-                                                            topicPartition.partition(),
-                                                            null,
-                                                            null,
-                                                            null)));
+                                      new PartitionInfo(topicPartition.topic(),
+                                                        topicPartition.partition(),
+                                                        null,
+                                                        null,
+                                                        null)));
         consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages)));
         consumer.assign(Collections.singletonList(topicPartition));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> KTable state restore fails after rebalance
> ------------------------------------------
>
>                 Key: KAFKA-6269
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6269
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andreas Schroeder
>            Assignee: Bill Bejeck
>            Priority: Blocker
>             Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>                           sourceTopic: String,
>                           existsTopic: String,
>                           valueSerde: Serde[V],
>                           valueMapper: ValueMapper[String, V]): KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>       .withKeySerde(Serdes.String())
>       .withValueSerde(valueSerde)
>       .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>              "entity-B",
>              "entity-B-exists",
>              EntityBInfoSerde,
>              ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => EntityDiff.fromJoin(a, b)
> val materialized = Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 partitions so that there is a total of 4 x 30 = 120 partitions to consume. The initial launch of the processor works fine, but when killing one processor and letting him re-join the stream threads leads to some faulty behaviour.
> Fist, the total number of assigned partitions over all processor machines is larger than 120 (sometimes 157, sometimes just 132), so the partition / task assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly with the error message of 'Detected a task that got migrated to another thread.' We gave the processor half an hour to recover; usually, rebuilding the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-B-exists-0 should not change while restoring: old end offset 4750539, current offset 4751388
> > StreamsTask taskId: 1_0
> > > 	ProcessorTopology:
> > 		KSTREAM-SOURCE-0000000008:
> > 			topics:		[entity-A-exists]
> > 			children:	[KTABLE-SOURCE-0000000009]
> > 		KTABLE-SOURCE-0000000009:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-JOINTHIS-0000000011]
> > 		KTABLE-JOINTHIS-0000000011:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > 		KSTREAM-SOURCE-0000000003:
> > 			topics:		[entity-B-exists]
> > 			children:	[KTABLE-SOURCE-0000000004]
> > 		KTABLE-SOURCE-0000000004:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-JOINOTHER-0000000012]
> > 		KTABLE-JOINOTHER-0000000012:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-0, entity-B-exists-0]
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> That one surprises me: the KTable state store entity-B-exists-persisted is rebuilt from entity-B-exists that of course can change while the rebuild is happening, since it the topic entity-B-exists is fed by another stream thread.
> Another one, very similar:
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-A-exists-24 should not change while restoring: old end offset 6483978, current offset 6485108
> > StreamsTask taskId: 1_24
> > > 	ProcessorTopology:
> > 		KSTREAM-SOURCE-0000000008:
> > 			topics:		[entity-A-exists]
> > 			children:	[KTABLE-SOURCE-0000000009]
> > 		KTABLE-SOURCE-0000000009:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-JOINTHIS-0000000011]
> > 		KTABLE-JOINTHIS-0000000011:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > 		KSTREAM-SOURCE-0000000003:
> > 			topics:		[entity-B-exists]
> > 			children:	[KTABLE-SOURCE-0000000004]
> > 		KTABLE-SOURCE-0000000004:
> > 			states:		[entity-B-exists-persisted]
> > 			children:	[KTABLE-JOINOTHER-0000000012]
> > 		KTABLE-JOINOTHER-0000000012:
> > 			states:		[entity-A-exists-persisted]
> > 			children:	[KTABLE-MERGE-0000000010]
> > 		KTABLE-MERGE-0000000010:
> > 			states:		[entity-A-joined-with-entity-B]
> > Partitions [entity-A-exists-24, entity-B-exists-24]
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:242)
> 	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> {code}
> Again, the topic entity-A-exists is fed by another stream thread.
> We saw around 60000 such errors per minute, as the stream threads continuously try to recover and fail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)