You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/18 15:05:17 UTC

[kafka] branch trunk updated: KAFKA-5697: revert wakeup-based impl (#5035)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 58a910f  KAFKA-5697: revert wakeup-based impl (#5035)
58a910f is described below

commit 58a910f0a7e6e2da4af9f1fcebdabeb22d909fcb
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri May 18 10:05:09 2018 -0500

    KAFKA-5697: revert wakeup-based impl (#5035)
    
    The wakeup-based strategy caused more problems than it
    solved, so we'll instead focus on KIP-266.
    
    Revert commit 2d8049b.
    
    Keep the metrics addition and the new test util.
    
    Also keep the tests for shutdown, although they must be ignored until
    poll(Duration) is done in the scope of KIP-266.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/streams/errors/ShutdownException.java    | 31 ------------------
 .../streams/processor/internals/ConsumerUtils.java | 38 ----------------------
 .../internals/GlobalStateManagerImpl.java          | 35 +++++---------------
 .../processor/internals/GlobalStreamThread.java    | 38 ++--------------------
 .../processor/internals/StoreChangelogReader.java  |  4 +--
 .../streams/processor/internals/StreamThread.java  | 11 +++----
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  1 +
 .../integration/utils/IntegrationTestUtils.java    |  4 +--
 .../apache/kafka/streams/perf/SimpleBenchmark.java |  6 ++--
 .../internals/GlobalStateManagerImplTest.java      | 20 +++---------
 .../processor/internals/StandbyTaskTest.java       |  5 ++-
 .../streams/tests/BrokerCompatibilityTest.java     |  4 +--
 .../apache/kafka/streams/tests/EosTestDriver.java  |  8 ++---
 .../kafka/streams/tests/SmokeTestDriver.java       |  4 +--
 .../kafka/streams/tools/StreamsResetterTest.java   | 21 ++++++------
 .../apache/kafka/streams/TopologyTestDriver.java   |  8 +----
 16 files changed, 42 insertions(+), 196 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
deleted file mode 100644
index d404642..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.errors;
-
-public class ShutdownException extends StreamsException {
-    public ShutdownException(final String message) {
-        super(message);
-    }
-
-    public ShutdownException(final String message, final Throwable throwable) {
-        super(message, throwable);
-    }
-
-    public ShutdownException(final Throwable throwable) {
-        super(throwable);
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
deleted file mode 100644
index 8b91257..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-
-import java.util.Collections;
-import java.util.List;
-
-public final class ConsumerUtils {
-    private ConsumerUtils() {}
-
-    public static <K, V> ConsumerRecords<K, V> poll(final Consumer<K, V> consumer, final long maxDurationMs) {
-        try {
-            return consumer.poll(maxDurationMs);
-        } catch (final WakeupException e) {
-            return new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<K, V>>>emptyMap());
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 017f2da..e8ec5e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,14 +23,12 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.errors.ShutdownException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -48,8 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 /**
  * This class is responsible for the initialization, restoration, closing, flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per Application Instance.
@@ -64,15 +60,13 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
     private InternalProcessorContext processorContext;
     private final int retries;
     private final long retryBackoffMs;
-    private final IsRunning isRunning;
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
                                   final Consumer<byte[], byte[]> globalConsumer,
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener stateRestoreListener,
-                                  final StreamsConfig config,
-                                  final IsRunning isRunning) {
+                                  final StreamsConfig config) {
         super(stateDirectory.globalStateDir());
 
         this.log = logContext.logger(GlobalStateManagerImpl.class);
@@ -82,11 +76,6 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
         this.stateRestoreListener = stateRestoreListener;
         this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
         this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
-        this.isRunning = isRunning;
-    }
-
-    public interface IsRunning {
-        boolean check();
     }
 
     @Override
@@ -211,13 +200,6 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
             try {
                 partitionInfos = globalConsumer.partitionsFor(sourceTopic);
                 break;
-            } catch (final WakeupException wakeupException) {
-                if (isRunning.check()) {
-                    // note we may decide later that this condition is ok and just let the retry loop continue
-                    throw new IllegalStateException("Got unexpected WakeupException during initialization.", wakeupException);
-                } else {
-                    throw new ShutdownException("Shutting down from fetching partitions");
-                }
             } catch (final TimeoutException retryableException) {
                 if (++attempts > retries) {
                     log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. " +
@@ -268,20 +250,19 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
 
             long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
-            final BatchingStateRestoreCallback stateRestoreAdapter =
-                (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof BatchingStateRestoreCallback)
-                    ? stateRestoreCallback
-                    : new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
+            BatchingStateRestoreCallback
+                stateRestoreAdapter =
+                (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof
+                                                     BatchingStateRestoreCallback)
+                                                ? stateRestoreCallback
+                                                : new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
             long restoreCount = 0L;
 
             while (offset < highWatermark) {
-                if (!isRunning.check()) {
-                    throw new ShutdownException("Streams is not running (any more)");
-                }
                 try {
-                    final ConsumerRecords<byte[], byte[]> records = poll(globalConsumer, 100);
+                    final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
                     final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
                     for (ConsumerRecord<byte[], byte[]> record : records) {
                         if (record.key() != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 4b6bfb1..112011f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
-import org.apache.kafka.streams.errors.ShutdownException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -43,7 +42,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
 
@@ -108,10 +106,6 @@ public class GlobalStreamThread extends Thread {
             return equals(RUNNING);
         }
 
-        public boolean isStarting() {
-            return equals(CREATED);
-        }
-
         @Override
         public boolean isValidTransition(final ThreadStateTransitionValidator newState) {
             final State tmpState = (State) newState;
@@ -179,12 +173,6 @@ public class GlobalStreamThread extends Thread {
         }
     }
 
-    private boolean stillStarting() {
-        synchronized (stateLock) {
-            return state.isStarting();
-        }
-    }
-
     public GlobalStreamThread(final ProcessorTopology topology,
                               final StreamsConfig config,
                               final Consumer<byte[], byte[]> globalConsumer,
@@ -247,7 +235,7 @@ public class GlobalStreamThread extends Thread {
 
         void pollAndUpdate() {
             try {
-                final ConsumerRecords<byte[], byte[]> received = poll(globalConsumer, pollMs);
+                final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
                 for (final ConsumerRecord<byte[], byte[]> record : received) {
                     stateMaintainer.update(record);
                 }
@@ -278,19 +266,7 @@ public class GlobalStreamThread extends Thread {
 
     @Override
     public void run() {
-        final StateConsumer stateConsumer;
-        try {
-            stateConsumer = initialize();
-        } catch (final ShutdownException e) {
-            log.info("Shutting down from initialization");
-            // Almost certainly, we arrived here because the state is already PENDING_SHUTDOWN, but it's harmless to
-            // just make sure
-            setState(State.PENDING_SHUTDOWN);
-            setState(State.DEAD);
-            streamsMetrics.removeAllThreadLevelSensors();
-            log.info("Shutdown complete");
-            return;
-        }
+        final StateConsumer stateConsumer = initialize();
 
         if (stateConsumer == null) {
             // during initialization, the caller thread would wait for the state consumer
@@ -342,14 +318,7 @@ public class GlobalStreamThread extends Thread {
                 globalConsumer,
                 stateDirectory,
                 stateRestoreListener,
-                config,
-                new GlobalStateManagerImpl.IsRunning() {
-                    @Override
-                    public boolean check() {
-                        return stillStarting() || stillRunning();
-                    }
-                }
-            );
+                config);
 
             final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
                 config,
@@ -402,7 +371,6 @@ public class GlobalStreamThread extends Thread {
         // one could call shutdown() multiple times, so ignore subsequent calls
         // if already shutting down or dead
         setState(PENDING_SHUTDOWN);
-        globalConsumer.wakeup();
     }
 
     public Map<MetricName, Metric> consumerMetrics() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 9b67fc4..5fcba76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -40,8 +40,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 public class StoreChangelogReader implements ChangelogReader {
 
     private final Logger log;
@@ -83,7 +81,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
         final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
         try {
-            final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
+            final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
             for (final TopicPartition partition : restoringPartitions) {
                 restorePartition(allRecords, partition, active.restoringTaskFor(partition));
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ddefd9c..3080d2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -65,7 +65,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 
 public class StreamThread extends Thread {
 
@@ -832,7 +831,7 @@ public class StreamThread extends Thread {
         ConsumerRecords<byte[], byte[]> records = null;
 
         try {
-            records = poll(consumer, pollTimeMs);
+            records = consumer.poll(pollTimeMs);
         } catch (final InvalidOffsetException e) {
             resetInvalidOffsets(e);
         }
@@ -1059,7 +1058,7 @@ public class StreamThread extends Thread {
             }
 
             try {
-                final ConsumerRecords<byte[], byte[]> records = poll(restoreConsumer, 0);
+                final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
 
                 if (!records.isEmpty()) {
                     for (final TopicPartition partition : records.partitions()) {
@@ -1124,8 +1123,6 @@ public class StreamThread extends Thread {
     public void shutdown() {
         log.info("Informed to shut down");
         final State oldState = setState(State.PENDING_SHUTDOWN);
-        consumer.wakeup();
-        restoreConsumer.wakeup();
         if (oldState == State.CREATED) {
             // The thread may not have been started. Take responsibility for shutting down
             completeShutdown(true);
@@ -1229,10 +1226,10 @@ public class StreamThread extends Thread {
                 result.putAll(producerMetrics);
             }
         } else {
-            // When EOS is turned on, each task will has its own producer client
+            // When EOS is turned on, each task will have its own producer client
             // and the producer object passed in here will be null. We would then iterate through
             // all the active tasks and add their metrics to the output metrics map.
-            for (StreamTask task: taskManager.activeTasks().values()) {
+            for (final StreamTask task: taskManager.activeTasks().values()) {
                 final Map<MetricName, ? extends Metric> taskProducerMetrics = task.getProducer().metrics();
                 result.putAll(taskProducerMetrics);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index cdfc470..904ebe2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -253,6 +253,7 @@ public class KafkaStreamsTest {
         // There's nothing to assert... We're testing that this operation actually completes.
     }
 
+    @Ignore // this test cannot pass until we implement KIP-266
     @Test
     public void testLocalThreadCloseWithoutConnectingToBroker() {
         final Properties props = new Properties();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index ec97f12..d306ee4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -51,8 +51,6 @@ import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 /**
  * Utility functions to make integration testing more convenient.
  */
@@ -390,7 +388,7 @@ public class IntegrationTestUtils {
         while (totalPollTimeMs < waitTime &&
             continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
-            final ConsumerRecords<K, V> records = poll(consumer, pollIntervalMs);
+            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
 
             for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), record.value()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index a660e67..8187467 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -62,8 +62,6 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 /**
  * Class that provides support for a series of benchmarks. It is usually driven by
  * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
@@ -336,7 +334,7 @@ public class SimpleBenchmark {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
@@ -374,7 +372,7 @@ public class SimpleBenchmark {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 7935271..2ca9c21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -79,14 +79,6 @@ public class GlobalStateManagerImplTest {
     private final TopicPartition t2 = new TopicPartition("t2", 1);
     private final TopicPartition t3 = new TopicPartition("t3", 1);
     private final TopicPartition t4 = new TopicPartition("t4", 1);
-
-    private final GlobalStateManagerImpl.IsRunning alwaysRunning = new GlobalStateManagerImpl.IsRunning() {
-        @Override
-        public boolean check() {
-            return true;
-        }
-    };
-
     private GlobalStateManagerImpl stateManager;
     private StateDirectory stateDirectory;
     private StreamsConfig streamsConfig;
@@ -127,8 +119,7 @@ public class GlobalStateManagerImplTest {
             consumer,
             stateDirectory,
             stateRestoreListener,
-            streamsConfig,
-            alwaysRunning);
+            streamsConfig);
         processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
         stateManager.setGlobalProcessorContext(processorContext);
         checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -516,8 +507,7 @@ public class GlobalStateManagerImplTest {
                 }
             },
             stateRestoreListener,
-            streamsConfig,
-            alwaysRunning
+            streamsConfig
         );
 
         try {
@@ -555,8 +545,7 @@ public class GlobalStateManagerImplTest {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig,
-                alwaysRunning);
+                streamsConfig);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
@@ -589,8 +578,7 @@ public class GlobalStateManagerImplTest {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig,
-                alwaysRunning);
+                streamsConfig);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 486b35e..93d6a0d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -62,7 +62,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singleton;
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -189,7 +188,7 @@ public class StandbyTaskTest {
         }
 
         restoreStateConsumer.seekToBeginning(partition);
-        task.update(partition2, poll(restoreStateConsumer, 100).records(partition2));
+        task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
 
         StandbyContextImpl context = (StandbyContextImpl) task.context();
         MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
@@ -246,7 +245,7 @@ public class StandbyTaskTest {
         }
 
         // The commit offset is at 0L. Records should not be processed
-        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, poll(restoreStateConsumer, 100).records(globalTopicPartition));
+        List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition));
         assertEquals(5, remaining.size());
 
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index c420d98..e897088 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -42,8 +42,6 @@ import java.util.Locale;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 public class BrokerCompatibilityTest {
 
     private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
@@ -155,7 +153,7 @@ public class BrokerCompatibilityTest {
             consumer.subscribe(Collections.singletonList(SINK_TOPIC));
 
             while (true) {
-                final ConsumerRecords<String, String> records = poll(consumer, 100);
+                final ConsumerRecords<String, String> records = consumer.poll(100);
                 for (final ConsumerRecord<String, String> record : records) {
                     if (record.key().equals("key") && record.value().equals("1")) {
                         return;
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 513d592..752cdd6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -52,8 +52,6 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 public class EosTestDriver extends SmokeTestUtil {
 
     private static final int MAX_NUMBER_OF_KEYS = 100;
@@ -256,7 +254,7 @@ public class EosTestDriver extends SmokeTestUtil {
                 topics.add("repartition");
             }
             consumer.subscribe(topics);
-            poll(consumer, 0);
+            consumer.poll(0);
 
             final Set<TopicPartition> partitions = new HashSet<>();
             for (final String topic : topics) {
@@ -286,7 +284,7 @@ public class EosTestDriver extends SmokeTestUtil {
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         boolean allRecordsReceived = false;
         while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> receivedRecords = poll(consumer, 100);
+            final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100);
 
             for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -593,7 +591,7 @@ public class EosTestDriver extends SmokeTestUtil {
 
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> records = poll(consumer, 100);
+            final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
             if (records.isEmpty()) {
                 System.out.println("No data received.");
                 for (final TopicPartition tp : partitions) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 74eac3f..50330a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -47,8 +47,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 public class SmokeTestDriver extends SmokeTestUtil {
 
     public static final int MAX_RECORD_EMPTY_RETRIES = 60;
@@ -291,7 +289,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         int retry = 0;
         final long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
-            ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+            ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)
                     && verifyMax(max, allData, false)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index 49f699e..ad19f32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -40,7 +40,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -75,7 +74,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(3, records.count());
     }
 
@@ -91,7 +90,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -107,7 +106,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -123,7 +122,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -139,7 +138,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(5, records.count());
     }
 
@@ -155,7 +154,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -173,7 +172,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 3L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -191,7 +190,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 1L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -209,7 +208,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 5L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -227,7 +226,7 @@ public class StreamsResetterTest {
         intermediateTopicPartitions.add(topicPartition);
         streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7b8bdd5..c237ca7 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -309,13 +309,7 @@ public class TopologyTestDriver implements Closeable {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig,
-                new GlobalStateManagerImpl.IsRunning() {
-                    @Override
-                    public boolean check() {
-                        return true;
-                    }
-                });
+                streamsConfig);
 
             final GlobalProcessorContextImpl globalProcessorContext
                 = new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.