You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/08 17:55:21 UTC
[kafka] branch 2.0 updated: KAFKA-5697: Use nonblocking poll in
Streams (#5107)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 3ab6e75 KAFKA-5697: Use nonblocking poll in Streams (#5107)
3ab6e75 is described below
commit 3ab6e75aaebfd68d9b043586b26c46f2c12d4c3a
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Jun 8 12:54:26 2018 -0500
KAFKA-5697: Use nonblocking poll in Streams (#5107)
Make use of the new Consumer#poll(Duration) to avoid getting stuck in poll when the broker is unavailable.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../internals/GlobalStateManagerImpl.java | 5 +-
.../processor/internals/GlobalStreamThread.java | 14 +-
.../processor/internals/StoreChangelogReader.java | 7 +-
.../streams/processor/internals/StreamThread.java | 28 +--
.../org/apache/kafka/streams/KafkaStreamsTest.java | 23 ++-
.../integration/utils/IntegrationTestUtils.java | 3 +-
.../apache/kafka/streams/perf/SimpleBenchmark.java | 5 +-
.../processor/internals/AbstractTaskTest.java | 3 +-
.../processor/internals/StandbyTaskTest.java | 12 +-
.../processor/internals/StateConsumerTest.java | 5 +-
.../internals/StoreChangelogReaderTest.java | 5 +-
.../processor/internals/StreamTaskTest.java | 3 +-
.../StreamThreadStateStoreProviderTest.java | 3 +-
.../streams/tests/BrokerCompatibilityTest.java | 3 +-
.../apache/kafka/streams/tests/EosTestDriver.java | 190 ++++++++++-----------
.../kafka/streams/tests/SmokeTestDriver.java | 3 +-
.../kafka/streams/tools/StreamsResetterTest.java | 21 +--
.../org/apache/kafka/test/MockRestoreConsumer.java | 4 +-
.../apache/kafka/streams/TopologyTestDriver.java | 2 +
19 files changed, 188 insertions(+), 151 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index e8ec5e9..4fd7a59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -60,6 +61,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
private InternalProcessorContext processorContext;
private final int retries;
private final long retryBackoffMs;
+ private final Duration pollTime;
public GlobalStateManagerImpl(final LogContext logContext,
final ProcessorTopology topology,
@@ -76,6 +78,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
this.stateRestoreListener = stateRestoreListener;
this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
}
@Override
@@ -262,7 +265,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
while (offset < highWatermark) {
try {
- final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
+ final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : records) {
if (record.key() != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 112011f..9d529c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -200,7 +201,7 @@ public class GlobalStreamThread extends Thread {
private final Consumer<byte[], byte[]> globalConsumer;
private final GlobalStateMaintainer stateMaintainer;
private final Time time;
- private final long pollMs;
+ private final Duration pollTime;
private final long flushInterval;
private final Logger log;
@@ -210,13 +211,13 @@ public class GlobalStreamThread extends Thread {
final Consumer<byte[], byte[]> globalConsumer,
final GlobalStateMaintainer stateMaintainer,
final Time time,
- final long pollMs,
+ final Duration pollTime,
final long flushInterval) {
this.log = logContext.logger(getClass());
this.globalConsumer = globalConsumer;
this.stateMaintainer = stateMaintainer;
this.time = time;
- this.pollMs = pollMs;
+ this.pollTime = pollTime;
this.flushInterval = flushInterval;
}
@@ -235,7 +236,7 @@ public class GlobalStreamThread extends Thread {
void pollAndUpdate() {
try {
- final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
+ final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollTime);
for (final ConsumerRecord<byte[], byte[]> record : received) {
stateMaintainer.update(record);
}
@@ -338,8 +339,9 @@ public class GlobalStreamThread extends Thread {
logContext
),
time,
- config.getLong(StreamsConfig.POLL_MS_CONFIG),
- config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+ Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
+ config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
+ );
stateConsumer.initialize();
return stateConsumer;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index bb0ed06..07af801 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -49,11 +50,14 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
+ private final Duration pollTime;
public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
+ final Duration pollTime,
final StateRestoreListener userStateRestoreListener,
final LogContext logContext) {
this.restoreConsumer = restoreConsumer;
+ this.pollTime = pollTime;
this.log = logContext.logger(getClass());
this.userStateRestoreListener = userStateRestoreListener;
}
@@ -76,7 +80,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
try {
- final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
+ final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(pollTime);
final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
while (iterator.hasNext()) {
final TopicPartition partition = iterator.next();
@@ -295,6 +299,7 @@ public class StoreChangelogReader implements ChangelogReader {
return true;
}
}
+
return false;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e72c4a5..a159e7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -50,6 +50,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -212,7 +213,7 @@ public class StreamThread extends Thread {
if (newState == State.RUNNING) {
updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks());
} else {
- updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
+ updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
}
}
@@ -555,7 +556,7 @@ public class StreamThread extends Thread {
}
private final Time time;
- private final long pollTimeMs;
+ private final Duration pollTime;
private final long commitTimeMs;
private final Object stateLock;
private final Logger log;
@@ -602,7 +603,8 @@ public class StreamThread extends Thread {
log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
- final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
+ final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
+ final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, pollTime, userStateRestoreListener, logContext);
Producer<byte[], byte[]> threadProducer = null;
final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
@@ -710,10 +712,10 @@ public class StreamThread extends Thread {
this.originalReset = originalReset;
this.versionProbingFlag = versionProbingFlag;
- this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+ this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
- updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
+ updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
}
/**
@@ -801,14 +803,14 @@ public class StreamThread extends Thread {
if (state == State.PARTITIONS_ASSIGNED) {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
- records = pollRequests(0L);
+ records = pollRequests(Duration.ZERO);
if (taskManager.updateNewAndRestoringTasks()) {
setState(State.RUNNING);
}
} else {
// try to fetch some records if necessary
- records = pollRequests(pollTimeMs);
+ records = pollRequests(pollTime);
// if state changed after the poll call,
// try to initialize the assigned tasks again
@@ -843,15 +845,15 @@ public class StreamThread extends Thread {
/**
* Get the next batch of records by polling.
*
- * @param pollTimeMs poll time millis parameter for the consumer poll
+ * @param pollTime how long to block in Consumer#poll
* @return Next batch of records or null if no records available.
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
- private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
+ private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) {
ConsumerRecords<byte[], byte[]> records = null;
try {
- records = consumer.poll(pollTimeMs);
+ records = consumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
resetInvalidOffsets(e);
}
@@ -1078,7 +1080,11 @@ public class StreamThread extends Thread {
}
try {
- final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+ // poll(0): Since this is during the normal processing, not during restoration.
+ // We can afford to have slower restore (because we don't wait inside poll for results).
+ // Instead, we want to proceed to the next iteration to call the main consumer#poll()
+ // as soon as possible so as to not be kicked out of the group.
+ final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(Duration.ZERO);
if (!records.isEmpty()) {
for (final TopicPartition partition : records.partitions()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 297b243..8635b94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -25,6 +27,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
@@ -42,7 +45,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -234,9 +236,8 @@ public class KafkaStreamsTest {
assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
}
- @Ignore // this test cannot pass as long as GST blocks KS.start()
@Test
- public void testGlobalThreadCloseWithoutConnectingToBroker() {
+ public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
@@ -244,16 +245,26 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
+ // We want to configure request.timeout.ms, but it must be larger than a
+ // few other configs.
+ props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200);
+ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 201);
+ props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 202);
+
final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
- streams.start();
- streams.close();
+ try {
+ streams.start();
+ fail("expected start() to time out and throw an exception.");
+ } catch (final StreamsException expected) {
+ // This is a result of not being able to connect to the broker.
+ }
// There's nothing to assert... We're testing that this operation actually completes.
}
- @Ignore // this test cannot pass until we implement KIP-266
@Test
public void testLocalThreadCloseWithoutConnectingToBroker() {
final Properties props = new Properties();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index fe897c7..86cb331 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -44,6 +44,7 @@ import scala.Option;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -464,7 +465,7 @@ public class IntegrationTestUtils {
while (totalPollTimeMs < waitTime &&
continueConsuming(consumerRecords.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs;
- final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+ final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(pollIntervalMs));
for (final ConsumerRecord<K, V> record : records) {
consumerRecords.add(record);
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 8187467..7179293 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@@ -334,7 +335,7 @@ public class SimpleBenchmark {
consumer.seekToBeginning(partitions);
while (true) {
- final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+ final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
if (records.isEmpty()) {
if (processedRecords == numRecords) {
break;
@@ -372,7 +373,7 @@ public class SimpleBenchmark {
consumer.seekToBeginning(partitions);
while (true) {
- final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+ final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
if (records.isEmpty()) {
if (processedRecords == numRecords) {
break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 347e9c4..4ed44be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -233,7 +234,7 @@ public class AbstractTaskTest {
storeTopicPartitions,
ProcessorTopology.withLocalStores(new ArrayList<>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics),
consumer,
- new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
+ new StoreChangelogReader(consumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
false,
stateDirectory,
config) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 93d6a0d..05d0e3d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -50,6 +50,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -122,7 +123,12 @@ public class StandbyTaskTest {
private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
- private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test "));
+ private final StoreChangelogReader changelogReader = new StoreChangelogReader(
+ restoreStateConsumer,
+ Duration.ZERO,
+ stateRestoreListener,
+ new LogContext("standby-task-test ")
+ );
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
@@ -188,7 +194,7 @@ public class StandbyTaskTest {
}
restoreStateConsumer.seekToBeginning(partition);
- task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
+ task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2));
StandbyContextImpl context = (StandbyContextImpl) task.context();
MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
@@ -245,7 +251,7 @@ public class StandbyTaskTest {
}
// The commit offset is at 0L. Records should not be processed
- List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition));
+ List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(Duration.ofMillis(100)).records(globalTopicPartition));
assertEquals(5, remaining.size());
committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 725211d..140f705 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -27,6 +27,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -52,7 +53,7 @@ public class StateConsumerTest {
partitionOffsets.put(topicOne, 20L);
partitionOffsets.put(topicTwo, 30L);
stateMaintainer = new StateMaintainerStub(partitionOffsets);
- stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL);
+ stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, Duration.ofMillis(10L), FLUSH_INTERVAL);
}
@Test
@@ -109,7 +110,7 @@ public class StateConsumerTest {
@Test
public void shouldNotFlushWhenFlushIntervalIsZero() {
- stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, -1);
+ stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, Duration.ofMillis(10L), -1);
stateConsumer.initialize();
time.sleep(100);
stateConsumer.pollAndUpdate();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index aabe7ff..90abf32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -39,6 +39,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -71,7 +72,7 @@ public class StoreChangelogReaderTest {
private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
private final TopicPartition topicPartition = new TopicPartition("topic", 0);
private final LogContext logContext = new LogContext("test-reader ");
- private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
+ private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
@Before
public void setUp() {
@@ -89,7 +90,7 @@ public class StoreChangelogReaderTest {
}
};
- final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
+ final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
changelogReader.restore(active);
assertTrue(functionCalled.get());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3a0fc4e..5537335 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -56,6 +56,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -116,7 +117,7 @@ public class StreamTaskTest {
private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
- private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test ")) {
+ private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Duration.ZERO, stateRestoreListener, new LogContext("stream-task-test ")) {
@Override
public Map<TopicPartition, Long> restoredOffsets() {
return Collections.singletonMap(changelogPartition, offset);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c24122a..66ea3c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -49,6 +49,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -177,7 +178,7 @@ public class StreamThreadStateStoreProviderTest {
Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
topology,
clientSupplier.consumer,
- new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task ")),
+ new StoreChangelogReader(clientSupplier.restoreConsumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("test-stream-task ")),
streamsConfig,
new MockStreamsMetrics(metrics),
stateDirectory,
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index e897088..3c8446c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
import java.util.Locale;
import java.util.Properties;
@@ -153,7 +154,7 @@ public class BrokerCompatibilityTest {
consumer.subscribe(Collections.singletonList(SINK_TOPIC));
while (true) {
- final ConsumerRecords<String, String> records = consumer.poll(100);
+ final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (final ConsumerRecord<String, String> record : records) {
if (record.key().equals("key") && record.value().equals("1")) {
return;
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 752cdd6..0b18864 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -16,16 +16,18 @@
*/
package org.apache.kafka.streams.tests;
-import kafka.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
@@ -40,17 +42,18 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
-import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
public class EosTestDriver extends SmokeTestUtil {
@@ -59,22 +62,19 @@ public class EosTestDriver extends SmokeTestUtil {
private static boolean isRunning = true;
- static int numRecordsProduced = 0;
+ private static int numRecordsProduced = 0;
- static synchronized void updateNumRecordsProduces(final int delta) {
+ private static synchronized void updateNumRecordsProduces(final int delta) {
numRecordsProduced += delta;
}
static void generate(final String kafka) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- System.out.println("Terminating");
- System.out.flush();
- isRunning = false;
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ System.out.println("Terminating");
+ System.out.flush();
+ isRunning = false;
+ }));
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest");
@@ -93,19 +93,16 @@ public class EosTestDriver extends SmokeTestUtil {
final ProducerRecord<String, Integer> record = new ProducerRecord<>("data", key, value);
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(final RecordMetadata metadata, final Exception exception) {
- if (exception != null) {
- exception.printStackTrace(System.err);
- System.err.flush();
- if (exception instanceof TimeoutException) {
- try {
- // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
- final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
- updateNumRecordsProduces(-expired);
- } catch (Exception ignore) { }
- }
+ producer.send(record, (metadata, exception) -> {
+ if (exception != null) {
+ exception.printStackTrace(System.err);
+ System.err.flush();
+ if (exception instanceof TimeoutException) {
+ try {
+ // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
+ final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
+ updateNumRecordsProduces(-expired);
+ } catch (final Exception ignore) { }
}
}
});
@@ -141,10 +138,6 @@ public class EosTestDriver extends SmokeTestUtil {
}
public static void verify(final String kafka, final boolean withRepartitioning) {
- ensureStreamsApplicationDown(kafka);
-
- final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka, withRepartitioning);
-
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
@@ -152,6 +145,13 @@ public class EosTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+ final Map<TopicPartition, Long> committedOffsets;
+ try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
+ ensureStreamsApplicationDown(adminClient);
+
+ committedOffsets = getCommittedOffsets(adminClient, withRepartitioning);
+ }
+
final String[] allInputTopics;
final String[] allOutputTopics;
if (withRepartitioning) {
@@ -218,54 +218,42 @@ public class EosTestDriver extends SmokeTestUtil {
System.out.flush();
}
- private static void ensureStreamsApplicationDown(final String kafka) {
- AdminClient adminClient = null;
- try {
- adminClient = AdminClient.createSimplePlaintext(kafka);
+ private static void ensureStreamsApplicationDown(final AdminClient adminClient) {
- final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
- while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
- if (System.currentTimeMillis() > maxWaitTime) {
- throw new RuntimeException("Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds.");
- }
- sleep(1000);
- }
- } finally {
- if (adminClient != null) {
- adminClient.close();
+ final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+ ConsumerGroupDescription description;
+ do {
+ description = getConsumerGroupDescription(adminClient);
+
+ if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) {
+ throw new RuntimeException(
+ "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds. " +
+ "Group: " + description
+ );
}
- }
+ sleep(1000);
+ } while (!description.members().isEmpty());
}
- private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka,
+
+ private static Map<TopicPartition, Long> getCommittedOffsets(final AdminClient adminClient,
final boolean withRepartitioning) {
- final Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap;
- final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
- try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
- final Set<String> topics = new HashSet<>();
- topics.add("data");
- if (withRepartitioning) {
- topics.add("repartition");
- }
- consumer.subscribe(topics);
- consumer.poll(0);
+ try {
+ final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID);
+ topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
+ } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
- final Set<TopicPartition> partitions = new HashSet<>();
- for (final String topic : topics) {
- for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
- partitions.add(new TopicPartition(partition.topic(), partition.partition()));
- }
- }
+ final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
- for (final TopicPartition tp : partitions) {
- final long offset = consumer.position(tp);
- committedOffsets.put(tp, offset);
+ for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
+ final String topic = entry.getKey().topic();
+ if (topic.equals("data") || withRepartitioning && topic.equals("repartition")) {
+ committedOffsets.put(entry.getKey(), entry.getValue().offset());
}
}
@@ -284,7 +272,7 @@ public class EosTestDriver extends SmokeTestUtil {
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
boolean allRecordsReceived = false;
while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
- final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100);
+ final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(Duration.ofMillis(100));
for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -327,19 +315,12 @@ public class EosTestDriver extends SmokeTestUtil {
final TopicPartition partition = new TopicPartition(topic, record.partition());
if (verifyTopic(topic, withRepartitioning)) {
- Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition
- = recordPerTopicPerPartition.get(topic);
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition =
+ recordPerTopicPerPartition.computeIfAbsent(topic, k -> new HashMap<>());
- if (topicRecordsPerPartition == null) {
- topicRecordsPerPartition = new HashMap<>();
- recordPerTopicPerPartition.put(topic, topicRecordsPerPartition);
- }
+ final List<ConsumerRecord<byte[], byte[]>> records =
+ topicRecordsPerPartition.computeIfAbsent(partition, k -> new ArrayList<>());
- List<ConsumerRecord<byte[], byte[]>> records = topicRecordsPerPartition.get(partition);
- if (records == null) {
- records = new ArrayList<>();
- topicRecordsPerPartition.put(partition, records);
- }
records.add(record);
} else {
throw new RuntimeException("FAIL: received data from unexpected topic: " + record);
@@ -397,7 +378,7 @@ public class EosTestDriver extends SmokeTestUtil {
if (partitionInput.size() != partitionMin.size()) {
throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
- + partitionRecords.getKey() + " but received " + partitionMin.size());
+ + partitionRecords.getKey() + " but received " + partitionMin.size());
}
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
@@ -439,7 +420,7 @@ public class EosTestDriver extends SmokeTestUtil {
if (partitionInput.size() != partitionSum.size()) {
throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
- + partitionRecords.getKey() + " but received " + partitionSum.size());
+ + partitionRecords.getKey() + " but received " + partitionSum.size());
}
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
@@ -480,7 +461,7 @@ public class EosTestDriver extends SmokeTestUtil {
if (partitionInput.size() != partitionMax.size()) {
throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
- + partitionRecords.getKey() + " but received " + partitionMax.size());
+ + partitionRecords.getKey() + " but received " + partitionMax.size());
}
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
@@ -501,7 +482,7 @@ public class EosTestDriver extends SmokeTestUtil {
max = Math.max(max, value);
currentMinPerKey.put(key, max);
- if (!receivedKey.equals(key) || receivedValue != max.intValue()) {
+ if (!receivedKey.equals(key) || receivedValue != max) {
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">");
}
}
@@ -521,7 +502,7 @@ public class EosTestDriver extends SmokeTestUtil {
if (partitionInput.size() != partitionCnt.size()) {
throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
- + partitionRecords.getKey() + " but received " + partitionCnt.size());
+ + partitionRecords.getKey() + " but received " + partitionCnt.size());
}
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
@@ -539,7 +520,7 @@ public class EosTestDriver extends SmokeTestUtil {
}
currentSumPerKey.put(key, ++cnt);
- if (!receivedKey.equals(key) || receivedValue != cnt.longValue()) {
+ if (!receivedKey.equals(key) || receivedValue != cnt) {
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">");
}
}
@@ -574,14 +555,11 @@ public class EosTestDriver extends SmokeTestUtil {
for (final TopicPartition tp : partitions) {
final ProducerRecord<String, String> record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value");
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(final RecordMetadata metadata, final Exception exception) {
- if (exception != null) {
- exception.printStackTrace(System.err);
- System.err.flush();
- Exit.exit(1);
- }
+ producer.send(record, (metadata, exception) -> {
+ if (exception != null) {
+ exception.printStackTrace(System.err);
+ System.err.flush();
+ Exit.exit(1);
}
});
}
@@ -591,7 +569,7 @@ public class EosTestDriver extends SmokeTestUtil {
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
System.out.println("No data received.");
for (final TopicPartition tp : partitions) {
@@ -638,4 +616,18 @@ public class EosTestDriver extends SmokeTestUtil {
return partitions;
}
+
+ private static ConsumerGroupDescription getConsumerGroupDescription(final AdminClient adminClient) {
+ final ConsumerGroupDescription description;
+ try {
+ description = adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID))
+ .describedGroups()
+ .get(EosTestClient.APP_ID)
+ .get(10, TimeUnit.SECONDS);
+ } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
+ e.printStackTrace();
+ throw new RuntimeException("Unexpected Exception getting group description", e);
+ }
+ return description;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 50330a0..7533fdd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.test.TestUtils;
import java.io.File;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -289,7 +290,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
int retry = 0;
final long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
- ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
if (verifyMin(min, allData, false)
&& verifyMax(max, allData, false)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index ad19f32..33cf1fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -74,7 +75,7 @@ public class StreamsResetterTest {
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(3, records.count());
}
@@ -90,7 +91,7 @@ public class StreamsResetterTest {
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(2, records.count());
}
@@ -106,7 +107,7 @@ public class StreamsResetterTest {
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(2, records.count());
}
@@ -122,7 +123,7 @@ public class StreamsResetterTest {
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(2, records.count());
}
@@ -138,7 +139,7 @@ public class StreamsResetterTest {
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(5, records.count());
}
@@ -154,7 +155,7 @@ public class StreamsResetterTest {
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(2, records.count());
}
@@ -172,7 +173,7 @@ public class StreamsResetterTest {
topicPartitionsAndOffset.put(topicPartition, 3L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(2, records.count());
}
@@ -190,7 +191,7 @@ public class StreamsResetterTest {
topicPartitionsAndOffset.put(topicPartition, 1L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(2, records.count());
}
@@ -208,7 +209,7 @@ public class StreamsResetterTest {
topicPartitionsAndOffset.put(topicPartition, 5L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(2, records.count());
}
@@ -226,7 +227,7 @@ public class StreamsResetterTest {
intermediateTopicPartitions.add(topicPartition);
streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions);
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
assertEquals(2, records.count());
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index 3070e36..00788fd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -85,9 +86,8 @@ public class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
super.assign(partitions);
}
- @Deprecated
@Override
- public ConsumerRecords<byte[], byte[]> poll(long timeout) {
+ public ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
// add buffered records to MockConsumer
for (ConsumerRecord<byte[], byte[]> record : recordBuffer) {
super.addRecord(record);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 773cbb4..7f75265 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -66,6 +66,7 @@ import org.apache.kafka.streams.test.OutputVerifier;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -327,6 +328,7 @@ public class TopologyTestDriver implements Closeable {
consumer,
new StoreChangelogReader(
createRestoreConsumer(processorTopology.storeToChangelogTopic()),
+ Duration.ZERO,
stateRestoreListener,
new LogContext("topology-test-driver ")),
streamsConfig,
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.