You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/26 08:55:11 UTC

[flink] branch master updated (873c481 -> 928d656)

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 873c481  [hotfix][filesystem] Fix the typo in InProgressFileWriter
     new c6f14ca  [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient in OffsetsInitializer
     new 928d656  [hotfix] fix a typo in KafkaTestBase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../source/enumerator/KafkaSourceEnumerator.java   | 117 +++++++++++++++------
 .../enumerator/initializer/OffsetsInitializer.java |  12 +--
 .../source/enumerator/KafkaEnumeratorTest.java     |   8 --
 .../initializer/OffsetsInitializerTest.java        |   4 +-
 .../streaming/connectors/kafka/KafkaTestBase.java  |   4 +-
 5 files changed, 91 insertions(+), 54 deletions(-)

[flink] 01/02: [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient in OffsetsInitializer

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6f14ca5b10d30232966bce2f52e9f9128346473
Author: dengziming <de...@growingio.com>
AuthorDate: Sat Dec 18 10:21:29 2021 +0800

    [FLINK-25368][connectors/kafka] Substitute KafkaConsumer with AdminClient in OffsetsInitializer
---
 .../source/enumerator/KafkaSourceEnumerator.java   | 117 +++++++++++++++------
 .../enumerator/initializer/OffsetsInitializer.java |  12 +--
 .../source/enumerator/KafkaEnumeratorTest.java     |   8 --
 .../initializer/OffsetsInitializerTest.java        |   4 +-
 4 files changed, 89 insertions(+), 52 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 8b89b7e..27dbd22 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -31,12 +31,13 @@ import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /** The enumerator class for Kafka source. */
 @Internal
@@ -81,7 +83,6 @@ public class KafkaSourceEnumerator
     private final String consumerGroupId;
 
     // Lazily instantiated or mutable fields.
-    private KafkaConsumer<byte[], byte[]> consumer;
     private AdminClient adminClient;
 
     // This flag will be marked as true if periodically partition discovery is disabled AND the
@@ -147,7 +148,6 @@ public class KafkaSourceEnumerator
      */
     @Override
     public void start() {
-        consumer = getKafkaConsumer();
         adminClient = getKafkaAdminClient();
         if (partitionDiscoveryIntervalMs > 0) {
             LOG.info(
@@ -200,9 +200,6 @@ public class KafkaSourceEnumerator
 
     @Override
     public void close() {
-        if (consumer != null) {
-            consumer.close();
-        }
         if (adminClient != null) {
             adminClient.close();
         }
@@ -402,25 +399,6 @@ public class KafkaSourceEnumerator
         return new PartitionChange(fetchedPartitions, removedPartitions);
     }
 
-    private KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
-        Properties consumerProps = new Properties();
-        deepCopyProperties(properties, consumerProps);
-        // set client id prefix
-        String clientIdPrefix =
-                consumerProps.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
-        consumerProps.setProperty(
-                ConsumerConfig.CLIENT_ID_CONFIG, clientIdPrefix + "-enumerator-consumer");
-        consumerProps.setProperty(
-                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        consumerProps.setProperty(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        // Disable auto topic creation.
-        consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
-        return new KafkaConsumer<>(consumerProps);
-    }
-
     private AdminClient getKafkaAdminClient() {
         Properties adminClientProps = new Properties();
         deepCopyProperties(properties, adminClientProps);
@@ -434,7 +412,7 @@ public class KafkaSourceEnumerator
 
     private OffsetsInitializer.PartitionOffsetsRetriever getOffsetsRetriever() {
         String groupId = properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-        return new PartitionOffsetsRetrieverImpl(consumer, adminClient, groupId);
+        return new PartitionOffsetsRetrieverImpl(adminClient, groupId);
     }
 
     /**
@@ -514,13 +492,10 @@ public class KafkaSourceEnumerator
     @VisibleForTesting
     public static class PartitionOffsetsRetrieverImpl
             implements OffsetsInitializer.PartitionOffsetsRetriever, AutoCloseable {
-        private final KafkaConsumer<?, ?> consumer;
         private final AdminClient adminClient;
         private final String groupId;
 
-        public PartitionOffsetsRetrieverImpl(
-                KafkaConsumer<?, ?> consumer, AdminClient adminClient, String groupId) {
-            this.consumer = consumer;
+        public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String groupId) {
             this.adminClient = adminClient;
             this.groupId = groupId;
         }
@@ -547,6 +522,7 @@ public class KafkaSourceEnumerator
                                 })
                         .get();
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 throw new FlinkRuntimeException(
                         "Interrupted while listing offsets for consumer group " + groupId, e);
             } catch (ExecutionException e) {
@@ -558,25 +534,96 @@ public class KafkaSourceEnumerator
             }
         }
 
+        /**
+         * List offsets for the specified partitions and OffsetSpec. This operation enables to find
+         * the beginning offset, end offset as well as the offset matching a timestamp in
+         * partitions.
+         *
+         * @see KafkaAdminClient#listOffsets(Map)
+         * @param topicPartitionOffsets The mapping from partition to the OffsetSpec to look up.
+         * @return The list offsets result.
+         */
+        private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets(
+                Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
+            try {
+                return adminClient
+                        .listOffsets(topicPartitionOffsets)
+                        .all()
+                        .thenApply(
+                                result -> {
+                                    Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
+                                            offsets = new HashMap<>();
+                                    result.forEach(
+                                            (tp, listOffsetsResultInfo) -> {
+                                                if (listOffsetsResultInfo != null) {
+                                                    offsets.put(tp, listOffsetsResultInfo);
+                                                }
+                                            });
+                                    return offsets;
+                                })
+                        .get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new FlinkRuntimeException(
+                        "Interrupted while listing offsets for topic partitions: "
+                                + topicPartitionOffsets,
+                        e);
+            } catch (ExecutionException e) {
+                throw new FlinkRuntimeException(
+                        "Failed to list offsets for topic partitions: "
+                                + topicPartitionOffsets
+                                + " due to",
+                        e);
+            }
+        }
+
+        private Map<TopicPartition, Long> listOffsets(
+                Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
+            return listOffsets(
+                            partitions.stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    partition -> partition, __ -> offsetSpec)))
+                    .entrySet().stream()
+                    .collect(
+                            Collectors.toMap(
+                                    Map.Entry::getKey, entry -> entry.getValue().offset()));
+        }
+
         @Override
         public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
-            return consumer.endOffsets(partitions);
+            return listOffsets(partitions, OffsetSpec.latest());
         }
 
         @Override
         public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
-            return consumer.beginningOffsets(partitions);
+            return listOffsets(partitions, OffsetSpec.earliest());
         }
 
         @Override
         public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
                 Map<TopicPartition, Long> timestampsToSearch) {
-            return consumer.offsetsForTimes(timestampsToSearch);
+            return listOffsets(
+                            timestampsToSearch.entrySet().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    Map.Entry::getKey,
+                                                    entry ->
+                                                            OffsetSpec.forTimestamp(
+                                                                    entry.getValue()))))
+                    .entrySet().stream()
+                    .collect(
+                            Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    entry ->
+                                            new OffsetAndTimestamp(
+                                                    entry.getValue().offset(),
+                                                    entry.getValue().timestamp(),
+                                                    entry.getValue().leaderEpoch())));
         }
 
         @Override
         public void close() throws Exception {
-            consumer.close(Duration.ZERO);
             adminClient.close(Duration.ZERO);
         }
     }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index 9774cb8..6de272d 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -24,7 +24,6 @@ import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
@@ -34,7 +33,8 @@ import java.util.Collection;
 import java.util.Map;
 
 /**
- * A interface for users to specify the starting / stopping offset of a {@link KafkaPartitionSplit}.
+ * An interface for users to specify the starting / stopping offset of a {@link
+ * KafkaPartitionSplit}.
  *
  * @see ReaderHandledOffsetsInitializer
  * @see SpecifiedOffsetsInitializer
@@ -85,13 +85,13 @@ public interface OffsetsInitializer extends Serializable {
          */
         Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions);
 
-        /** @see KafkaConsumer#endOffsets(Collection) */
+        /** List end offsets for the specified partitions. */
         Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
 
-        /** @see KafkaConsumer#beginningOffsets(Collection) */
+        /** List beginning offsets for the specified partitions. */
         Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
 
-        /** @see KafkaConsumer#offsetsForTimes(Map) */
+        /** List offsets matching a timestamp for the specified partitions. */
         Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
                 Map<TopicPartition, Long> timestampsToSearch);
     }
@@ -130,7 +130,7 @@ public interface OffsetsInitializer extends Serializable {
      * @param timestamp the timestamp to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets based on the given
      *     timestamp.
-     * @see KafkaConsumer#offsetsForTimes(Map)
+     * @see KafkaAdminClient#listOffsets(Map)
      */
     static OffsetsInitializer timestamp(long timestamp) {
         return new TimestampOffsetsInitializer(timestamp);
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 9216172..2a3200b 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.mock.Whitebox;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.AfterClass;
@@ -313,15 +312,8 @@ public class KafkaEnumeratorTest {
                     defaultTimeoutMs,
                     Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"));
 
-            KafkaConsumer<?, ?> consumer =
-                    (KafkaConsumer<?, ?>) Whitebox.getInternalState(enumerator, "consumer");
-            assertNotNull(consumer);
-            clientId = (String) Whitebox.getInternalState(consumer, "clientId");
             assertNotNull(clientId);
             assertTrue(clientId.startsWith(clientIdPrefix));
-            assertEquals(
-                    (long) defaultTimeoutMs,
-                    Whitebox.getInternalState(consumer, "requestTimeoutMs"));
         }
     }
 
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
index 6c696f0..4b74bbe 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
@@ -51,9 +51,7 @@ public class OffsetsInitializerTest {
         KafkaSourceTestEnv.setupTopic(TOPIC2, false, false, KafkaSourceTestEnv::getRecordsForTopic);
         retriever =
                 new KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl(
-                        KafkaSourceTestEnv.getConsumer(),
-                        KafkaSourceTestEnv.getAdminClient(),
-                        KafkaSourceTestEnv.GROUP_ID);
+                        KafkaSourceTestEnv.getAdminClient(), KafkaSourceTestEnv.GROUP_ID);
     }
 
     @AfterClass

[flink] 02/02: [hotfix] fix a typo in KafkaTestBase

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 928d6569de10732bb7f5f84009d88d3daf643f43
Author: dengziming <de...@growingio.com>
AuthorDate: Fri Dec 24 10:53:32 2021 +0800

    [hotfix] fix a typo in KafkaTestBase
---
 .../org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 5c1aa25..5246242 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -152,7 +152,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
     public static void startClusters(KafkaTestEnvironment.Config environmentConfig)
             throws Exception {
-        kafkaServer = constructKafkaTestEnvionment();
+        kafkaServer = constructKafkaTestEnvironment();
 
         LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
@@ -171,7 +171,7 @@ public abstract class KafkaTestBase extends TestLogger {
         }
     }
 
-    public static KafkaTestEnvironment constructKafkaTestEnvionment() throws Exception {
+    public static KafkaTestEnvironment constructKafkaTestEnvironment() throws Exception {
         Class<?> clazz =
                 Class.forName(
                         "org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");