You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/02 05:34:33 UTC
[kafka] branch 2.6 updated: MINOR: ChangelogReader should poll for
duration 0 for standby restore (#8773)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new d02db92 MINOR: ChangelogReader should poll for duration 0 for standby restore (#8773)
d02db92 is described below
commit d02db92ba06e46d7608f002732baf1a8ac0cd141
Author: Rohan <de...@gmail.com>
AuthorDate: Mon Jun 1 22:33:22 2020 -0700
MINOR: ChangelogReader should poll for duration 0 for standby restore (#8773)
Co-authored-by: Guozhang Wang <wa...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../kafka/clients/consumer/MockConsumer.java | 9 +++++-
.../processor/internals/StoreChangelogReader.java | 8 +++---
.../internals/StoreChangelogReaderTest.java | 32 ++++++++++++++++++++++
3 files changed, 44 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index a7fc1e8..7bf4c3f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -65,6 +65,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private KafkaException pollException;
private KafkaException offsetsException;
private AtomicBoolean wakeup;
+ private Duration lastPollTimeout;
private boolean closed;
private boolean shouldRebalance;
@@ -157,13 +158,15 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Deprecated
@Override
public synchronized ConsumerRecords<K, V> poll(long timeout) {
- return poll(Duration.ZERO);
+ return poll(Duration.ofMillis(timeout));
}
@Override
public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
ensureNotClosed();
+ lastPollTimeout = timeout;
+
// Synchronize around the entire execution so new tasks to be triggered on subsequent poll calls can be added in
// the callback
synchronized (pollTasks) {
@@ -556,6 +559,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
shouldRebalance = false;
}
+ public Duration lastPollTimeout() {
+ return lastPollTimeout;
+ }
+
@Override
public void close(Duration timeout) {
close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index c712bd3..6c6ff39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -216,9 +216,6 @@ public class StoreChangelogReader implements ChangelogReader {
this.restoreConsumer = restoreConsumer;
this.stateRestoreListener = stateRestoreListener;
- // NOTE for restoring active and updating standby we may prefer different poll time
- // in order to make sure we call the main consumer#poll in time.
- // TODO: once both of these are moved to a separate thread this may no longer be a concern
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
this.updateOffsetIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) == Long.MAX_VALUE ?
DEFAULT_OFFSET_UPDATE_MS : config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
@@ -415,7 +412,10 @@ public class StoreChangelogReader implements ChangelogReader {
final ConsumerRecords<byte[], byte[]> polledRecords;
try {
- polledRecords = restoreConsumer.poll(pollTime);
+ // for restoring active and updating standby we may prefer different poll time
+ // in order to make sure we call the main consumer#poll in time.
+ // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
+ polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
} catch (final InvalidOffsetException e) {
log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
"the consumer's position has fallen out of the topic partition offset range because the topic was " +
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e873e04..6956e79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -46,6 +46,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -227,6 +228,37 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
}
@Test
+ public void shouldPollWithRightTimeout() {
+ EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+ EasyMock.replay(stateManager, storeMetadata, store);
+
+ final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+ @Override
+ public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
+ return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L));
+ }
+ };
+ consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
+
+ final StoreChangelogReader changelogReader =
+ new StoreChangelogReader(time, config, logContext, consumer, callback);
+
+ changelogReader.register(tp, stateManager);
+
+ if (type == STANDBY) {
+ changelogReader.transitToUpdateStandby();
+ }
+
+ changelogReader.restore();
+
+ if (type == ACTIVE) {
+ assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
+ } else {
+ assertEquals(Duration.ZERO, consumer.lastPollTimeout());
+ }
+ }
+
+ @Test
public void shouldRestoreFromPositionAndCheckForCompletion() {
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.replay(stateManager, storeMetadata, store);