You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/11/25 09:48:11 UTC

[flink] 02/02: [FLINK-30056][kafka] Use new Consumer#poll(Duration)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e0e0fde6305577192fe8020fcb1fa8ca3bd115e
Author: Sergey Nuyanzin <sn...@gmail.com>
AuthorDate: Wed Nov 23 13:14:50 2022 +0100

    [FLINK-30056][kafka] Use new Consumer#poll(Duration)
    
    This changes the blocking behavior, as the previous method essentially ignored the timeout.
---
 .../streaming/connectors/kafka/internals/KafkaConsumerThread.java      | 3 ++-
 .../streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java   | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
index 4c4cc907944..f7f40b80bae 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -258,7 +259,7 @@ public class KafkaConsumerThread<T> extends Thread {
                 // over
                 if (records == null) {
                     try {
-                        records = consumer.poll(pollTimeout);
+                        records = consumer.poll(Duration.ofMillis(pollTimeout));
                     } catch (WakeupException we) {
                         continue;
                     }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index b9ed7d92f14..2f393374aab 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -238,7 +238,7 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
             kafkaConsumer.subscribe(Collections.singletonList(topicName));
             ConsumerRecords<String, String> records = ConsumerRecords.empty();
             while (records.isEmpty()) {
-                records = kafkaConsumer.poll(10000);
+                records = kafkaConsumer.poll(Duration.ofMillis(10000));
             }
 
             ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);