You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/11/25 09:48:11 UTC
[flink] 02/02: [FLINK-30056][kafka] Use new Consumer#poll(Duration)
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5e0e0fde6305577192fe8020fcb1fa8ca3bd115e
Author: Sergey Nuyanzin <sn...@gmail.com>
AuthorDate: Wed Nov 23 13:14:50 2022 +0100
[FLINK-30056][kafka] Use new Consumer#poll(Duration)
This changes the blocking behavior, as the previous method essentially ignored the timeout.
---
.../streaming/connectors/kafka/internals/KafkaConsumerThread.java | 3 ++-
.../streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java | 2 +-
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
index 4c4cc907944..f7f40b80bae 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
import javax.annotation.Nonnull;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -258,7 +259,7 @@ public class KafkaConsumerThread<T> extends Thread {
// over
if (records == null) {
try {
- records = consumer.poll(pollTimeout);
+ records = consumer.poll(Duration.ofMillis(pollTimeout));
} catch (WakeupException we) {
continue;
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index b9ed7d92f14..2f393374aab 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -238,7 +238,7 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
kafkaConsumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<String, String> records = ConsumerRecords.empty();
while (records.isEmpty()) {
- records = kafkaConsumer.poll(10000);
+ records = kafkaConsumer.poll(Duration.ofMillis(10000));
}
ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);