You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/08 17:54:51 UTC

[kafka] branch trunk updated: KAFKA-5697: Use nonblocking poll in Streams (#5107)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 74bdafe  KAFKA-5697: Use nonblocking poll in Streams (#5107)
74bdafe is described below

commit 74bdafe386ae6080d6f5ead852f9026061a65be4
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Jun 8 12:54:26 2018 -0500

    KAFKA-5697: Use nonblocking poll in Streams (#5107)
    
    Make use of the new Consumer#poll(Duration) to avoid getting stuck in poll when the broker is unavailable.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../internals/GlobalStateManagerImpl.java          |   5 +-
 .../processor/internals/GlobalStreamThread.java    |  14 +-
 .../processor/internals/StoreChangelogReader.java  |   7 +-
 .../streams/processor/internals/StreamThread.java  |  28 +--
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  23 ++-
 .../integration/utils/IntegrationTestUtils.java    |   3 +-
 .../apache/kafka/streams/perf/SimpleBenchmark.java |   5 +-
 .../processor/internals/AbstractTaskTest.java      |   3 +-
 .../processor/internals/StandbyTaskTest.java       |  12 +-
 .../processor/internals/StateConsumerTest.java     |   5 +-
 .../internals/StoreChangelogReaderTest.java        |   5 +-
 .../processor/internals/StreamTaskTest.java        |   3 +-
 .../StreamThreadStateStoreProviderTest.java        |   3 +-
 .../streams/tests/BrokerCompatibilityTest.java     |   3 +-
 .../apache/kafka/streams/tests/EosTestDriver.java  | 190 ++++++++++-----------
 .../kafka/streams/tests/SmokeTestDriver.java       |   3 +-
 .../kafka/streams/tools/StreamsResetterTest.java   |  21 +--
 .../org/apache/kafka/test/MockRestoreConsumer.java |   4 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   2 +
 19 files changed, 188 insertions(+), 151 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index e8ec5e9..4fd7a59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -60,6 +61,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
     private InternalProcessorContext processorContext;
     private final int retries;
     private final long retryBackoffMs;
+    private final Duration pollTime;
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
@@ -76,6 +78,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
         this.stateRestoreListener = stateRestoreListener;
         this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
         this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
     }
 
     @Override
@@ -262,7 +265,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
 
             while (offset < highWatermark) {
                 try {
-                    final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
+                    final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
                     final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
                     for (ConsumerRecord<byte[], byte[]> record : records) {
                         if (record.key() != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 112011f..9d529c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -200,7 +201,7 @@ public class GlobalStreamThread extends Thread {
         private final Consumer<byte[], byte[]> globalConsumer;
         private final GlobalStateMaintainer stateMaintainer;
         private final Time time;
-        private final long pollMs;
+        private final Duration pollTime;
         private final long flushInterval;
         private final Logger log;
 
@@ -210,13 +211,13 @@ public class GlobalStreamThread extends Thread {
                       final Consumer<byte[], byte[]> globalConsumer,
                       final GlobalStateMaintainer stateMaintainer,
                       final Time time,
-                      final long pollMs,
+                      final Duration pollTime,
                       final long flushInterval) {
             this.log = logContext.logger(getClass());
             this.globalConsumer = globalConsumer;
             this.stateMaintainer = stateMaintainer;
             this.time = time;
-            this.pollMs = pollMs;
+            this.pollTime = pollTime;
             this.flushInterval = flushInterval;
         }
 
@@ -235,7 +236,7 @@ public class GlobalStreamThread extends Thread {
 
         void pollAndUpdate() {
             try {
-                final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
+                final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollTime);
                 for (final ConsumerRecord<byte[], byte[]> record : received) {
                     stateMaintainer.update(record);
                 }
@@ -338,8 +339,9 @@ public class GlobalStreamThread extends Thread {
                     logContext
                 ),
                 time,
-                config.getLong(StreamsConfig.POLL_MS_CONFIG),
-                config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+                Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
+                config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
+            );
             stateConsumer.initialize();
 
             return stateConsumer;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index bb0ed06..07af801 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -49,11 +50,14 @@ public class StoreChangelogReader implements ChangelogReader {
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
+    private final Duration pollTime;
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
+                                final Duration pollTime,
                                 final StateRestoreListener userStateRestoreListener,
                                 final LogContext logContext) {
         this.restoreConsumer = restoreConsumer;
+        this.pollTime = pollTime;
         this.log = logContext.logger(getClass());
         this.userStateRestoreListener = userStateRestoreListener;
     }
@@ -76,7 +80,7 @@ public class StoreChangelogReader implements ChangelogReader {
         }
 
         try {
-            final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
+            final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(pollTime);
             final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
             while (iterator.hasNext()) {
                 final TopicPartition partition = iterator.next();
@@ -295,6 +299,7 @@ public class StoreChangelogReader implements ChangelogReader {
                 return true;
             }
         }
+
         return false;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e72c4a5..a159e7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -50,6 +50,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -212,7 +213,7 @@ public class StreamThread extends Thread {
             if (newState == State.RUNNING) {
                 updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks());
             } else {
-                updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
+                updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
             }
         }
 
@@ -555,7 +556,7 @@ public class StreamThread extends Thread {
     }
 
     private final Time time;
-    private final long pollTimeMs;
+    private final Duration pollTime;
     private final long commitTimeMs;
     private final Object stateLock;
     private final Logger log;
@@ -602,7 +603,8 @@ public class StreamThread extends Thread {
         log.info("Creating restore consumer client");
         final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
         final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
+        final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, pollTime, userStateRestoreListener, logContext);
 
         Producer<byte[], byte[]> threadProducer = null;
         final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
@@ -710,10 +712,10 @@ public class StreamThread extends Thread {
         this.originalReset = originalReset;
         this.versionProbingFlag = versionProbingFlag;
 
-        this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+        this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
 
-        updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
+        updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
     }
 
     /**
@@ -801,14 +803,14 @@ public class StreamThread extends Thread {
         if (state == State.PARTITIONS_ASSIGNED) {
             // try to fetch some records with zero poll millis
             // to unblock the restoration as soon as possible
-            records = pollRequests(0L);
+            records = pollRequests(Duration.ZERO);
 
             if (taskManager.updateNewAndRestoringTasks()) {
                 setState(State.RUNNING);
             }
         } else {
             // try to fetch some records if necessary
-            records = pollRequests(pollTimeMs);
+            records = pollRequests(pollTime);
 
             // if state changed after the poll call,
             // try to initialize the assigned tasks again
@@ -843,15 +845,15 @@ public class StreamThread extends Thread {
     /**
      * Get the next batch of records by polling.
      *
-     * @param pollTimeMs poll time millis parameter for the consumer poll
+     * @param pollTime how long to block in Consumer#poll
      * @return Next batch of records or null if no records available.
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
+    private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) {
         ConsumerRecords<byte[], byte[]> records = null;
 
         try {
-            records = consumer.poll(pollTimeMs);
+            records = consumer.poll(pollTime);
         } catch (final InvalidOffsetException e) {
             resetInvalidOffsets(e);
         }
@@ -1078,7 +1080,11 @@ public class StreamThread extends Thread {
             }
 
             try {
-                final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+                // poll(0): Since this is during the normal processing, not during restoration.
+                // We can afford to have slower restore (because we don't wait inside poll for results).
+                // Instead, we want to proceed to the next iteration to call the main consumer#poll()
+                // as soon as possible so as to not be kicked out of the group.
+                final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(Duration.ZERO);
 
                 if (!records.isEmpty()) {
                     for (final TopicPartition partition : records.partitions()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 297b243..8635b94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -25,6 +27,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -42,7 +45,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -234,9 +236,8 @@ public class KafkaStreamsTest {
         assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
     }
 
-    @Ignore // this test cannot pass as long as GST blocks KS.start()
     @Test
-    public void testGlobalThreadCloseWithoutConnectingToBroker() {
+    public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
@@ -244,16 +245,26 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
+        // We want to configure request.timeout.ms, but it must be larger than a
+        // few other configs.
+        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200);
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 201);
+        props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 202);
+
         final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        streams.start();
-        streams.close();
+        try {
+            streams.start();
+            fail("expected start() to time out and throw an exception.");
+        } catch (final StreamsException expected) {
+            // This is a result of not being able to connect to the broker.
+        }
         // There's nothing to assert... We're testing that this operation actually completes.
     }
 
-    @Ignore // this test cannot pass until we implement KIP-266
     @Test
     public void testLocalThreadCloseWithoutConnectingToBroker() {
         final Properties props = new Properties();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index fe897c7..86cb331 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -44,6 +44,7 @@ import scala.Option;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -464,7 +465,7 @@ public class IntegrationTestUtils {
         while (totalPollTimeMs < waitTime &&
             continueConsuming(consumerRecords.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
-            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+            final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(pollIntervalMs));
 
             for (final ConsumerRecord<K, V> record : records) {
                 consumerRecords.add(record);
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 8187467..7179293 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -334,7 +335,7 @@ public class SimpleBenchmark {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
@@ -372,7 +373,7 @@ public class SimpleBenchmark {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 347e9c4..4ed44be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -233,7 +234,7 @@ public class AbstractTaskTest {
                                 storeTopicPartitions,
                                 ProcessorTopology.withLocalStores(new ArrayList<>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics),
                                 consumer,
-                                new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
+                                new StoreChangelogReader(consumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
                                 false,
                                 stateDirectory,
                                 config) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 93d6a0d..05d0e3d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -50,6 +50,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -122,7 +123,12 @@ public class StandbyTaskTest {
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test "));
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(
+        restoreStateConsumer,
+        Duration.ZERO,
+        stateRestoreListener,
+        new LogContext("standby-task-test ")
+    );
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
@@ -188,7 +194,7 @@ public class StandbyTaskTest {
         }
 
         restoreStateConsumer.seekToBeginning(partition);
-        task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
+        task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2));
 
         StandbyContextImpl context = (StandbyContextImpl) task.context();
         MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
@@ -245,7 +251,7 @@ public class StandbyTaskTest {
         }
 
         // The commit offset is at 0L. Records should not be processed
-        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition));
+        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(Duration.ofMillis(100)).records(globalTopicPartition));
         assertEquals(5, remaining.size());
 
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 725211d..140f705 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -52,7 +53,7 @@ public class StateConsumerTest {
         partitionOffsets.put(topicOne, 20L);
         partitionOffsets.put(topicTwo, 30L);
         stateMaintainer = new StateMaintainerStub(partitionOffsets);
-        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL);
+        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, Duration.ofMillis(10L), FLUSH_INTERVAL);
     }
 
     @Test
@@ -109,7 +110,7 @@ public class StateConsumerTest {
 
     @Test
     public void shouldNotFlushWhenFlushIntervalIsZero() {
-        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, -1);
+        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, Duration.ofMillis(10L), -1);
         stateConsumer.initialize();
         time.sleep(100);
         stateConsumer.pollAndUpdate();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index aabe7ff..90abf32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -39,6 +39,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -71,7 +72,7 @@ public class StoreChangelogReaderTest {
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
     private final TopicPartition topicPartition = new TopicPartition("topic", 0);
     private final LogContext logContext = new LogContext("test-reader ");
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
 
     @Before
     public void setUp() {
@@ -89,7 +90,7 @@ public class StoreChangelogReaderTest {
             }
         };
 
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         assertTrue(functionCalled.get());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3a0fc4e..5537335 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -56,6 +56,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -116,7 +117,7 @@ public class StreamTaskTest {
     private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test ")) {
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Duration.ZERO, stateRestoreListener, new LogContext("stream-task-test ")) {
         @Override
         public Map<TopicPartition, Long> restoredOffsets() {
             return Collections.singletonMap(changelogPartition, offset);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c24122a..66ea3c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -49,6 +49,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -177,7 +178,7 @@ public class StreamThreadStateStoreProviderTest {
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task ")),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("test-stream-task ")),
             streamsConfig,
             new MockStreamsMetrics(metrics),
             stateDirectory,
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index e897088..3c8446c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueMapper;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Locale;
 import java.util.Properties;
@@ -153,7 +154,7 @@ public class BrokerCompatibilityTest {
             consumer.subscribe(Collections.singletonList(SINK_TOPIC));
 
             while (true) {
-                final ConsumerRecords<String, String> records = consumer.poll(100);
+                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                 for (final ConsumerRecord<String, String> record : records) {
                     if (record.key().equals("key") && record.value().equals("1")) {
                         return;
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 752cdd6..0b18864 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -16,16 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
-import kafka.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.SerializationException;
@@ -40,17 +42,18 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
-import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class EosTestDriver extends SmokeTestUtil {
 
@@ -59,22 +62,19 @@ public class EosTestDriver extends SmokeTestUtil {
 
     private static boolean isRunning = true;
 
-    static int numRecordsProduced = 0;
+    private static int numRecordsProduced = 0;
 
-    static synchronized void updateNumRecordsProduces(final int delta) {
+    private static synchronized void updateNumRecordsProduces(final int delta) {
         numRecordsProduced += delta;
     }
 
     static void generate(final String kafka) {
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                System.out.println("Terminating");
-                System.out.flush();
-                isRunning = false;
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            System.out.println("Terminating");
+            System.out.flush();
+            isRunning = false;
+        }));
 
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest");
@@ -93,19 +93,16 @@ public class EosTestDriver extends SmokeTestUtil {
 
             final ProducerRecord<String, Integer> record = new ProducerRecord<>("data", key, value);
 
-            producer.send(record, new Callback() {
-                @Override
-                public void onCompletion(final RecordMetadata metadata, final Exception exception) {
-                    if (exception != null) {
-                        exception.printStackTrace(System.err);
-                        System.err.flush();
-                        if (exception instanceof TimeoutException) {
-                            try {
-                                // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
-                                final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
-                                updateNumRecordsProduces(-expired);
-                            } catch (Exception ignore) { }
-                        }
+            producer.send(record, (metadata, exception) -> {
+                if (exception != null) {
+                    exception.printStackTrace(System.err);
+                    System.err.flush();
+                    if (exception instanceof TimeoutException) {
+                        try {
+                            // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
+                            final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
+                            updateNumRecordsProduces(-expired);
+                        } catch (final Exception ignore) { }
                     }
                 }
             });
@@ -141,10 +138,6 @@ public class EosTestDriver extends SmokeTestUtil {
     }
 
     public static void verify(final String kafka, final boolean withRepartitioning) {
-        ensureStreamsApplicationDown(kafka);
-
-        final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka, withRepartitioning);
-
         final Properties props = new Properties();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
@@ -152,6 +145,13 @@ public class EosTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
 
+        final Map<TopicPartition, Long> committedOffsets;
+        try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
+            ensureStreamsApplicationDown(adminClient);
+
+            committedOffsets = getCommittedOffsets(adminClient, withRepartitioning);
+        }
+
         final String[] allInputTopics;
         final String[] allOutputTopics;
         if (withRepartitioning) {
@@ -218,54 +218,42 @@ public class EosTestDriver extends SmokeTestUtil {
         System.out.flush();
     }
 
-    private static void ensureStreamsApplicationDown(final String kafka) {
-        AdminClient adminClient = null;
-        try {
-            adminClient = AdminClient.createSimplePlaintext(kafka);
+    private static void ensureStreamsApplicationDown(final AdminClient adminClient) {
 
-            final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-            while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
-                if (System.currentTimeMillis() > maxWaitTime) {
-                    throw new RuntimeException("Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds.");
-                }
-                sleep(1000);
-            }
-        } finally {
-            if (adminClient != null) {
-                adminClient.close();
+        final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+        ConsumerGroupDescription description;
+        do {
+            description = getConsumerGroupDescription(adminClient);
+
+            if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) {
+                throw new RuntimeException(
+                    "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds. " +
+                        "Group: " + description
+                );
             }
-        }
+            sleep(1000);
+        } while (!description.members().isEmpty());
     }
 
-    private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka,
+
+    private static Map<TopicPartition, Long> getCommittedOffsets(final AdminClient adminClient,
                                                                  final boolean withRepartitioning) {
-        final Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
-        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap;
 
-        final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
-        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
-            final Set<String> topics = new HashSet<>();
-            topics.add("data");
-            if (withRepartitioning) {
-                topics.add("repartition");
-            }
-            consumer.subscribe(topics);
-            consumer.poll(0);
+        try {
+            final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID);
+            topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
+        } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
 
-            final Set<TopicPartition> partitions = new HashSet<>();
-            for (final String topic : topics) {
-                for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
-                    partitions.add(new TopicPartition(partition.topic(), partition.partition()));
-                }
-            }
+        final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
 
-            for (final TopicPartition tp : partitions) {
-                final long offset = consumer.position(tp);
-                committedOffsets.put(tp, offset);
+        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
+            final String topic = entry.getKey().topic();
+            if (topic.equals("data") || withRepartitioning && topic.equals("repartition")) {
+                committedOffsets.put(entry.getKey(), entry.getValue().offset());
             }
         }
 
@@ -284,7 +272,7 @@ public class EosTestDriver extends SmokeTestUtil {
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         boolean allRecordsReceived = false;
         while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100);
+            final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(Duration.ofMillis(100));
 
             for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -327,19 +315,12 @@ public class EosTestDriver extends SmokeTestUtil {
         final TopicPartition partition = new TopicPartition(topic, record.partition());
 
         if (verifyTopic(topic, withRepartitioning)) {
-            Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition
-                = recordPerTopicPerPartition.get(topic);
+            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition =
+                recordPerTopicPerPartition.computeIfAbsent(topic, k -> new HashMap<>());
 
-            if (topicRecordsPerPartition == null) {
-                topicRecordsPerPartition = new HashMap<>();
-                recordPerTopicPerPartition.put(topic, topicRecordsPerPartition);
-            }
+            final List<ConsumerRecord<byte[], byte[]>> records =
+                topicRecordsPerPartition.computeIfAbsent(partition, k -> new ArrayList<>());
 
-            List<ConsumerRecord<byte[], byte[]>> records = topicRecordsPerPartition.get(partition);
-            if (records == null) {
-                records = new ArrayList<>();
-                topicRecordsPerPartition.put(partition, records);
-            }
             records.add(record);
         } else {
             throw new RuntimeException("FAIL: received data from unexpected topic: " + record);
@@ -397,7 +378,7 @@ public class EosTestDriver extends SmokeTestUtil {
 
             if (partitionInput.size() != partitionMin.size()) {
                 throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
-                    +  partitionRecords.getKey() + " but received " + partitionMin.size());
+                    + partitionRecords.getKey() + " but received " + partitionMin.size());
             }
 
             final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
@@ -439,7 +420,7 @@ public class EosTestDriver extends SmokeTestUtil {
 
             if (partitionInput.size() != partitionSum.size()) {
                 throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
-                    +  partitionRecords.getKey() + " but received " + partitionSum.size());
+                    + partitionRecords.getKey() + " but received " + partitionSum.size());
             }
 
             final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
@@ -480,7 +461,7 @@ public class EosTestDriver extends SmokeTestUtil {
 
             if (partitionInput.size() != partitionMax.size()) {
                 throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
-                    +  partitionRecords.getKey() + " but received " + partitionMax.size());
+                    + partitionRecords.getKey() + " but received " + partitionMax.size());
             }
 
             final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
@@ -501,7 +482,7 @@ public class EosTestDriver extends SmokeTestUtil {
                 max = Math.max(max, value);
                 currentMinPerKey.put(key, max);
 
-                if (!receivedKey.equals(key) || receivedValue != max.intValue()) {
+                if (!receivedKey.equals(key) || receivedValue != max) {
                     throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">");
                 }
             }
@@ -521,7 +502,7 @@ public class EosTestDriver extends SmokeTestUtil {
 
             if (partitionInput.size() != partitionCnt.size()) {
                 throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
-                    +  partitionRecords.getKey() + " but received " + partitionCnt.size());
+                    + partitionRecords.getKey() + " but received " + partitionCnt.size());
             }
 
             final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
@@ -539,7 +520,7 @@ public class EosTestDriver extends SmokeTestUtil {
                 }
                 currentSumPerKey.put(key, ++cnt);
 
-                if (!receivedKey.equals(key) || receivedValue != cnt.longValue()) {
+                if (!receivedKey.equals(key) || receivedValue != cnt) {
                     throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">");
                 }
             }
@@ -574,14 +555,11 @@ public class EosTestDriver extends SmokeTestUtil {
             for (final TopicPartition tp : partitions) {
                 final ProducerRecord<String, String> record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value");
 
-                producer.send(record, new Callback() {
-                    @Override
-                    public void onCompletion(final RecordMetadata metadata, final Exception exception) {
-                        if (exception != null) {
-                            exception.printStackTrace(System.err);
-                            System.err.flush();
-                            Exit.exit(1);
-                        }
+                producer.send(record, (metadata, exception) -> {
+                    if (exception != null) {
+                        exception.printStackTrace(System.err);
+                        System.err.flush();
+                        Exit.exit(1);
                     }
                 });
             }
@@ -591,7 +569,7 @@ public class EosTestDriver extends SmokeTestUtil {
 
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+            final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
             if (records.isEmpty()) {
                 System.out.println("No data received.");
                 for (final TopicPartition tp : partitions) {
@@ -638,4 +616,18 @@ public class EosTestDriver extends SmokeTestUtil {
         return partitions;
     }
 
+
+    private static ConsumerGroupDescription getConsumerGroupDescription(final AdminClient adminClient) {
+        final ConsumerGroupDescription description;
+        try {
+            description = adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID))
+                .describedGroups()
+                .get(EosTestClient.APP_ID)
+                .get(10, TimeUnit.SECONDS);
+        } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
+            e.printStackTrace();
+            throw new RuntimeException("Unexpected Exception getting group description", e);
+        }
+        return description;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 50330a0..7533fdd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -289,7 +290,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         int retry = 0;
         final long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
-            ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+            ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)
                     && verifyMax(max, allData, false)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index ad19f32..33cf1fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -74,7 +75,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(3, records.count());
     }
 
@@ -90,7 +91,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -106,7 +107,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -122,7 +123,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -138,7 +139,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(5, records.count());
     }
 
@@ -154,7 +155,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -172,7 +173,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 3L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -190,7 +191,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 1L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -208,7 +209,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 5L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -226,7 +227,7 @@ public class StreamsResetterTest {
         intermediateTopicPartitions.add(topicPartition);
         streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index 3070e36..00788fd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -85,9 +86,8 @@ public class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
         super.assign(partitions);
     }
 
-    @Deprecated
     @Override
-    public ConsumerRecords<byte[], byte[]> poll(long timeout) {
+    public ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
         // add buffered records to MockConsumer
         for (ConsumerRecord<byte[], byte[]> record : recordBuffer) {
             super.addRecord(record);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 773cbb4..7f75265 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -66,6 +66,7 @@ import org.apache.kafka.streams.test.OutputVerifier;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -327,6 +328,7 @@ public class TopologyTestDriver implements Closeable {
                 consumer,
                 new StoreChangelogReader(
                     createRestoreConsumer(processorTopology.storeToChangelogTopic()),
+                    Duration.ZERO,
                     stateRestoreListener,
                     new LogContext("topology-test-driver ")),
                 streamsConfig,

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.