You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/04/07 05:07:59 UTC
[kafka] branch 1.1 updated: KAFKA-7192: Cherry-pick 5430 to 1.1
(#6546)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 0e6e58d KAFKA-7192: Cherry-pick 5430 to 1.1 (#6546)
0e6e58d is described below
commit 0e6e58d04cc83d6f46ee62575432ae5301a66bf4
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sat Apr 6 22:07:40 2019 -0700
KAFKA-7192: Cherry-pick 5430 to 1.1 (#6546)
The first PR of KAFKA-7192 is cherry-picked to 1.1 but the follow-up (#5430) is not. This is causing flaky EOS system test failures.
Some test results:
In 2.0 branch, running 25 times (the streams_eos_test has 4 tests, so = 100 tests), no failures:
http://confluent-kafka-2-0-system-test-results.s3-us-west-2.amazonaws.com/2019-04-05--001.1554466177--apache--2.0--db22e3d/report.html
In 1.1 branch before this PR, running 5 times, failed 10 tests:
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2019-04-02--001.1554239700--guozhangwang--KMinor-1.1-eos-test--8395fce/report.html
In this branch (after this PR), running 25 times, no failures:
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2019-04-05--001.1554465488--guozhangwang--KMinor-1.1-eos-test--897aa03/report.html
Reviewers: Bill Bejeck <bb...@gmail.com>
---
.../processor/internals/StoreChangelogReader.java | 149 +++++++++------------
.../streams/processor/internals/StreamTask.java | 4 +
.../internals/StoreChangelogReaderTest.java | 99 +++-----------
tests/kafkatest/services/streams.py | 2 +-
4 files changed, 89 insertions(+), 165 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 501ddef..8ade2de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -48,8 +48,9 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
- private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
- private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
+ private final Set<TopicPartition> needsRestoring = new HashSet<>();
+ private final Set<TopicPartition> needsInitializing = new HashSet<>();
+ private final Set<TopicPartition> completedRestorers = new HashSet<>();
public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
final StateRestoreListener userStateRestoreListener,
@@ -61,14 +62,14 @@ public class StoreChangelogReader implements ChangelogReader {
@Override
public void register(final StateRestorer restorer) {
- final StateRestorer existingRestorer = stateRestorers.get(restorer.partition());
- if (existingRestorer == null) {
+ if (!stateRestorers.containsKey(restorer.partition())) {
restorer.setUserRestoreListener(userStateRestoreListener);
stateRestorers.put(restorer.partition(), restorer);
- needsInitializing.put(restorer.partition(), restorer);
- } else {
- needsInitializing.put(restorer.partition(), existingRestorer);
+
+ log.trace("Added restorer for changelog {}", restorer.partition());
}
+
+ needsInitializing.add(restorer.partition());
}
/**
@@ -84,18 +85,29 @@ public class StoreChangelogReader implements ChangelogReader {
return completed();
}
- final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
try {
- final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
- for (final TopicPartition partition : restoringPartitions) {
- restorePartition(allRecords, partition, active.restoringTaskFor(partition));
+ final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
+
+ for (final TopicPartition partition : needsRestoring) {
+ final StateRestorer restorer = stateRestorers.get(partition);
+ final long pos = processNext(records.records(partition), restorer, endOffsets.get(partition));
+ restorer.setRestoredOffset(pos);
+ if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
+ restorer.restoreDone();
+ endOffsets.remove(partition);
+ completedRestorers.add(partition);
+ }
}
} catch (final InvalidOffsetException recoverableException) {
log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException);
final Set<TopicPartition> partitions = recoverableException.partitions();
for (final TopicPartition partition : partitions) {
final StreamTask task = active.restoringTaskFor(partition);
- log.info("Reinitializing StreamTask {}", task);
+
+ log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
+
+ needsInitializing.remove(partition);
+ needsRestoring.remove(partition);
final StateRestorer restorer = stateRestorers.get(partition);
restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
@@ -104,6 +116,8 @@ public class StoreChangelogReader implements ChangelogReader {
restoreConsumer.seekToBeginning(partitions);
}
+ needsRestoring.removeAll(completedRestorers);
+
if (needsRestoring.isEmpty()) {
restoreConsumer.unsubscribe();
}
@@ -120,25 +134,24 @@ public class StoreChangelogReader implements ChangelogReader {
// the needsInitializing map is not empty, meaning we do not know the metadata for some of them yet
refreshChangelogInfo();
- final Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
- for (final Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet()) {
- final TopicPartition topicPartition = entry.getKey();
+ final Set<TopicPartition> initializable = new HashSet<>();
+ for (final TopicPartition topicPartition : needsInitializing) {
if (hasPartition(topicPartition)) {
- initializable.put(entry.getKey(), entry.getValue());
+ initializable.add(topicPartition);
}
}
// try to fetch end offsets for the initializable restorers and remove any partitions
// where we already have all of the data
try {
- endOffsets.putAll(restoreConsumer.endOffsets(initializable.keySet()));
+ endOffsets.putAll(restoreConsumer.endOffsets(initializable));
} catch (final TimeoutException e) {
// if timeout exception gets thrown we just give up this time and retry in the next run loop
log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
return;
}
- final Iterator<TopicPartition> iter = initializable.keySet().iterator();
+ final Iterator<TopicPartition> iter = initializable.iterator();
while (iter.hasNext()) {
final TopicPartition topicPartition = iter.next();
final Long endOffset = endOffsets.get(topicPartition);
@@ -146,13 +159,15 @@ public class StoreChangelogReader implements ChangelogReader {
// offset should not be null; but since the consumer API does not guarantee it
// we add this check just in case
if (endOffset != null) {
- final StateRestorer restorer = needsInitializing.get(topicPartition);
+ final StateRestorer restorer = stateRestorers.get(topicPartition);
if (restorer.checkpoint() >= endOffset) {
restorer.setRestoredOffset(restorer.checkpoint());
iter.remove();
+ completedRestorers.add(topicPartition);
} else if (restorer.offsetLimit() == 0 || endOffset == 0) {
restorer.setRestoredOffset(0);
iter.remove();
+ completedRestorers.add(topicPartition);
} else {
restorer.setEndingOffset(endOffset);
}
@@ -169,55 +184,59 @@ public class StoreChangelogReader implements ChangelogReader {
}
}
- private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
+ private void startRestoration(final Set<TopicPartition> initialized,
final RestoringTasks active) {
- log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
+ log.debug("Start restoring state stores from changelog topics {}", initialized);
final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
- assignment.addAll(initialized.keySet());
+ assignment.addAll(initialized);
restoreConsumer.assign(assignment);
final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
- for (final StateRestorer restorer : initialized.values()) {
- final TopicPartition restoringPartition = restorer.partition();
+
+ for (final TopicPartition partition : initialized) {
+ final StateRestorer restorer = stateRestorers.get(partition);
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
- restoreConsumer.seek(restoringPartition, restorer.checkpoint());
- logRestoreOffsets(restoringPartition,
- restorer.checkpoint(),
- endOffsets.get(restoringPartition));
- restorer.setStartingOffset(restoreConsumer.position(restoringPartition));
+ restoreConsumer.seek(partition, restorer.checkpoint());
+ logRestoreOffsets(partition,
+ restorer.checkpoint(),
+ endOffsets.get(partition));
+ restorer.setStartingOffset(restoreConsumer.position(partition));
restorer.restoreStarted();
} else {
- restoreConsumer.seekToBeginning(Collections.singletonList(restoringPartition));
+ restoreConsumer.seekToBeginning(Collections.singletonList(partition));
needsPositionUpdate.add(restorer);
}
}
for (final StateRestorer restorer : needsPositionUpdate) {
- final TopicPartition restoringPartition = restorer.partition();
- final StreamTask task = active.restoringTaskFor(restoringPartition);
+ final TopicPartition partition = restorer.partition();
+
// If checkpoint does not exist it means the task was not shutdown gracefully before;
// and in this case if EOS is turned on we should wipe out the state and re-initialize the task
- if (task.eosEnabled) {
+ final StreamTask task = active.restoringTaskFor(partition);
+ if (task.isEosEnabled()) {
log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " +
- "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restoringPartition);
- // we move the partitions here, because they will be added back within
- // `task.reinitializeStateStoresForPartitions()` that calls `register()` internally again
- needsInitializing.remove(restoringPartition);
- restorer.setCheckpointOffset(restoreConsumer.position(restoringPartition));
- task.reinitializeStateStoresForPartitions(Collections.singleton(restoringPartition));
+ "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), partition);
+
+ needsInitializing.remove(partition);
+ initialized.remove(partition);
+ restorer.setCheckpointOffset(restoreConsumer.position(partition));
+
+ task.reinitializeStateStoresForPartitions(Collections.singleton(partition));
} else {
- log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restoringPartition);
- final long position = restoreConsumer.position(restoringPartition);
- logRestoreOffsets(restoringPartition,
- position,
- endOffsets.get(restoringPartition));
+ log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), partition);
+
+ final long position = restoreConsumer.position(restorer.partition());
+ logRestoreOffsets(restorer.partition(),
+ position,
+ endOffsets.get(restorer.partition()));
restorer.setStartingOffset(position);
restorer.restoreStarted();
}
}
- needsRestoring.putAll(initialized);
+ needsRestoring.addAll(initialized);
}
private void logRestoreOffsets(final TopicPartition partition,
@@ -230,10 +249,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
private Collection<TopicPartition> completed() {
- final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
- completed.removeAll(needsRestoring.keySet());
- log.trace("The set of restoration completed partitions so far: {}", completed);
- return completed;
+ return completedRestorers;
}
private void refreshChangelogInfo() {
@@ -265,41 +281,6 @@ public class StoreChangelogReader implements ChangelogReader {
needsInitializing.clear();
}
- /**
- * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
- */
- private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
- final TopicPartition topicPartition,
- final Task task) {
- final StateRestorer restorer = stateRestorers.get(topicPartition);
- final Long endOffset = endOffsets.get(topicPartition);
- final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
- restorer.setRestoredOffset(pos);
- if (restorer.hasCompleted(pos, endOffset)) {
- if (pos > endOffset) {
- throw new TaskMigratedException(task, topicPartition, endOffset, pos);
- }
-
- // need to check for changelog topic
- if (restorer.offsetLimit() == Long.MAX_VALUE) {
- final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
- if (!restorer.hasCompleted(pos, updatedEndOffset)) {
- throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos);
- }
- }
-
-
- log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
- topicPartition,
- restorer.restoredNumRecords(),
- restorer.startingOffset(),
- restorer.restoredOffset());
-
- restorer.restoreDone();
- needsRestoring.remove(topicPartition);
- }
- }
-
private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
final StateRestorer restorer,
final Long endOffset) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6dd4726..6e40c99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -162,6 +162,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
}
+ public boolean isEosEnabled() {
+ return eosEnabled;
+ }
+
@Override
public boolean initializeStateStores() {
log.trace("Initializing state stores");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 5391f31..0db0216 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.MockStateRestoreListener;
@@ -103,6 +102,7 @@ public class StoreChangelogReaderTest {
expect(mockRestorer.partition())
.andReturn(new TopicPartition("sometopic", 0))
.andReturn(new TopicPartition("sometopic", 0))
+ .andReturn(new TopicPartition("sometopic", 0))
.andReturn(new TopicPartition("sometopic", 0));
EasyMock.replay(mockRestorer);
changelogReader.register(mockRestorer);
@@ -142,11 +142,15 @@ public class StoreChangelogReaderTest {
});
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
- EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task).andReturn(task);
- EasyMock.replay(active);
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task).anyTimes();
+ expect(task.isEosEnabled()).andReturn(false).anyTimes();
+ replay(active, task);
// first restore call "fails" but we should not die with an exception
assertEquals(0, changelogReader.restore(active).size());
+
+ changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
+ "storeName"));
// retry restore should succeed
assertEquals(1, changelogReader.restore(active).size());
assertThat(callback.restored.size(), equalTo(messages));
@@ -246,10 +250,11 @@ public class StoreChangelogReaderTest {
changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
+ expect(task.isEosEnabled()).andReturn(false).anyTimes();
expect(active.restoringTaskFor(one)).andReturn(task);
expect(active.restoringTaskFor(two)).andReturn(task);
- expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
+ expect(active.restoringTaskFor(topicPartition)).andReturn(task).anyTimes();
+ replay(active, task);
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(10));
@@ -270,9 +275,9 @@ public class StoreChangelogReaderTest {
setupConsumer(3, two);
changelogReader
- .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
- changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
- changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
+ .register(new StateRestorer(topicPartition, restoreListener, 0L, Long.MAX_VALUE, true, "storeName1"));
+ changelogReader.register(new StateRestorer(one, restoreListener1, 0L, Long.MAX_VALUE, true, "storeName2"));
+ changelogReader.register(new StateRestorer(two, restoreListener2, 0L, Long.MAX_VALUE, true, "storeName3"));
expect(active.restoringTaskFor(one)).andReturn(task);
expect(active.restoringTaskFor(two)).andReturn(task);
@@ -297,11 +302,10 @@ public class StoreChangelogReaderTest {
@Test
public void shouldOnlyReportTheLastRestoredOffset() {
setupConsumer(10, topicPartition);
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
-
- expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
-
+ changelogReader
+ .register(new StateRestorer(topicPartition, restoreListener, 0L, 5, true, "storeName1"));
+ expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+ replay(active, task);
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(5));
@@ -423,7 +427,8 @@ public class StoreChangelogReaderTest {
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
expect(active.restoringTaskFor(postInitialization)).andReturn(task);
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
+ expect(task.isEosEnabled()).andReturn(false).anyTimes();
+ replay(active, task);
assertTrue(changelogReader.restore(active).isEmpty());
@@ -444,47 +449,6 @@ public class StoreChangelogReaderTest {
}
@Test
- public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() {
- final int messages = 10;
- setupConsumer(messages, topicPartition);
- consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
- expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
-
- try {
- changelogReader.restore(active);
- fail("Should have thrown TaskMigratedException");
- } catch (final TaskMigratedException expected) {
- /* ignore */
- }
- }
-
-
- @Test
- public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() {
- final int messages = 10;
- setupConsumer(messages, topicPartition);
- // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic
- // so a subsequent call to endOffsets returns a value exceeding the expected end value
- consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
- expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
-
- try {
- changelogReader.restore(active);
- fail("Should have thrown TaskMigratedException");
- } catch (final TaskMigratedException expected) {
- // verifies second block threw exception with updated end offset
- assertTrue(expected.getMessage().contains("end offset 15, current offset 10"));
- }
- }
-
-
- @Test
public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
final int messages = 10;
setupConsumer(messages, topicPartition);
@@ -499,31 +463,6 @@ public class StoreChangelogReaderTest {
changelogReader.restore(active);
}
-
- @Test
- public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
- final int totalMessages = 10;
- assignPartition(totalMessages, topicPartition);
- // records 0..4
- addRecords(5, topicPartition, 0);
- //EOS enabled commit marker at offset 5 so rest of records 6..10
- addRecords(5, topicPartition, 6);
- consumer.assign(Collections.<TopicPartition>emptyList());
-
- // end offsets should start after commit marker of 5 from above
- consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L));
- changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
-
- expect(active.restoringTaskFor(topicPartition)).andReturn(task);
- replay(active);
- try {
- changelogReader.restore(active);
- fail("Should have thrown task migrated exception");
- } catch (final TaskMigratedException expected) {
- /* ignore */
- }
- }
-
@Test
public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
final int totalMessages = 10;
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 0ed3a42..e14f25e 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -276,7 +276,7 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
def clean_node(self, node):
if self.clean_node_enabled:
- super.clean_node(self, node)
+ super(StreamsEosTestBaseService, self).clean_node(node)
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):