You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/02 05:34:33 UTC

[kafka] branch 2.6 updated: MINOR: ChangelogReader should poll for duration 0 for standby restore (#8773)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new d02db92  MINOR: ChangelogReader should poll for duration 0 for standby restore (#8773)
d02db92 is described below

commit d02db92ba06e46d7608f002732baf1a8ac0cd141
Author: Rohan <de...@gmail.com>
AuthorDate: Mon Jun 1 22:33:22 2020 -0700

    MINOR: ChangelogReader should poll for duration 0 for standby restore (#8773)
    
    Co-authored-by: Guozhang Wang <wa...@gmail.com>
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/consumer/MockConsumer.java       |  9 +++++-
 .../processor/internals/StoreChangelogReader.java  |  8 +++---
 .../internals/StoreChangelogReaderTest.java        | 32 ++++++++++++++++++++++
 3 files changed, 44 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index a7fc1e8..7bf4c3f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -65,6 +65,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private KafkaException pollException;
     private KafkaException offsetsException;
     private AtomicBoolean wakeup;
+    private Duration lastPollTimeout;
     private boolean closed;
     private boolean shouldRebalance;
 
@@ -157,13 +158,15 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     @Deprecated
     @Override
     public synchronized ConsumerRecords<K, V> poll(long timeout) {
-        return poll(Duration.ZERO);
+        return poll(Duration.ofMillis(timeout));
     }
 
     @Override
     public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
         ensureNotClosed();
 
+        lastPollTimeout = timeout;
+
         // Synchronize around the entire execution so new tasks to be triggered on subsequent poll calls can be added in
         // the callback
         synchronized (pollTasks) {
@@ -556,6 +559,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         shouldRebalance = false;
     }
 
+    public Duration lastPollTimeout() {
+        return lastPollTimeout;
+    }
+
     @Override
     public void close(Duration timeout) {
         close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index c712bd3..6c6ff39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -216,9 +216,6 @@ public class StoreChangelogReader implements ChangelogReader {
         this.restoreConsumer = restoreConsumer;
         this.stateRestoreListener = stateRestoreListener;
 
-        // NOTE for restoring active and updating standby we may prefer different poll time
-        // in order to make sure we call the main consumer#poll in time.
-        // TODO: once both of these are moved to a separate thread this may no longer be a concern
         this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
         this.updateOffsetIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) == Long.MAX_VALUE ?
             DEFAULT_OFFSET_UPDATE_MS : config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
@@ -415,7 +412,10 @@ public class StoreChangelogReader implements ChangelogReader {
             final ConsumerRecords<byte[], byte[]> polledRecords;
 
             try {
-                polledRecords = restoreConsumer.poll(pollTime);
+                // for restoring active and updating standby we may prefer different poll time
+                // in order to make sure we call the main consumer#poll in time.
+                // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
+                polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
             } catch (final InvalidOffsetException e) {
                 log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
                     "the consumer's position has fallen out of the topic partition offset range because the topic was " +
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e873e04..6956e79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -46,6 +46,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -227,6 +228,37 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     }
 
     @Test
+    public void shouldPollWithRightTimeout() {
+        EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+        EasyMock.replay(stateManager, storeMetadata, store);
+
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
+                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L));
+            }
+        };
+        consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
+
+        final StoreChangelogReader changelogReader =
+                new StoreChangelogReader(time, config, logContext, consumer, callback);
+
+        changelogReader.register(tp, stateManager);
+
+        if (type == STANDBY) {
+            changelogReader.transitToUpdateStandby();
+        }
+
+        changelogReader.restore();
+
+        if (type == ACTIVE) {
+            assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
+        } else {
+            assertEquals(Duration.ZERO, consumer.lastPollTimeout());
+        }
+    }
+
+    @Test
     public void shouldRestoreFromPositionAndCheckForCompletion() {
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
         EasyMock.replay(stateManager, storeMetadata, store);