You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/12/04 16:40:55 UTC
[1/2] flink git commit: [FLINK-8190] [kafka] Add constructors to
expose topic pattern-based subscription
Repository: flink
Updated Branches:
refs/heads/master 7c5a6941b -> 7e9007450
[FLINK-8190] [kafka] Add constructors to expose topic pattern-based subscription
This closes #5117.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c40b84df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c40b84df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c40b84df
Branch: refs/heads/master
Commit: c40b84dfcf748d04f4e3866bf2c60b183c9cf6b3
Parents: 7c5a694
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 4 15:05:46 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Dec 5 00:31:57 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 55 ++++++++++++++++-
.../connectors/kafka/FlinkKafkaConsumer010.java | 45 ++++++++++++++
.../connectors/kafka/FlinkKafkaConsumer011.java | 45 ++++++++++++++
.../connectors/kafka/FlinkKafkaConsumer08.java | 62 ++++++++++++++++++-
.../connectors/kafka/FlinkKafkaConsumer09.java | 64 +++++++++++++++++++-
5 files changed, 265 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 5376d5b..6c80370 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -294,7 +294,9 @@ Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
-### Kafka Consumers Partition Discovery
+### Kafka Consumers Topic and Partition Discovery
+
+#### Partition discovery
The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes them with
exactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (i.e., when the
@@ -309,6 +311,57 @@ prior to Flink 1.3.x, partition discovery cannot be enabled on the restore run.
with an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x and
then restore again from that.
+#### Topic discovery
+
+At a higher-level, the Flink Kafka Consumer is also capable of discovering topics, based on pattern matching on the
+topic names using regular expressions. See the below for an example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+properties.setProperty("group.id", "test");
+
+FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
+ java.util.regex.Pattern.compile("test-topic-[0-9]"),
+ new SimpleStringSchema(),
+ properties);
+
+DataStream<String> stream = env.addSource(myConsumer);
+...
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val properties = new Properties()
+properties.setProperty("bootstrap.servers", "localhost:9092")
+properties.setProperty("group.id", "test")
+
+val myConsumer = new FlinkKafkaConsumer08[String](
+ java.util.regex.Pattern.compile("test-topic-[0-9]"),
+ new SimpleStringSchema,
+ properties)
+
+val stream = env.addSource(myConsumer)
+...
+{% endhighlight %}
+</div>
+</div>
+
+In the above example, all topics with names that match the specified regular expression
+(starting with `test-topic-` and ending with a single digit) will be subscribed by the consumer
+when the job starts running.
+
+To allow the consumer to discover dynamically created topics after the job started running,
+set a non-negative value for `flink.partition-discovery.interval-millis`. This allows
+the consumer to discover partitions of new topics with names that also match the specified
+pattern.
+
### Kafka Consumers Offset Committing Behaviour Configuration
The Flink Kafka Consumer allows configuring the behaviour of how offsets
http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index f569477..6fb63e1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -39,6 +40,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Pattern;
/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -126,6 +128,49 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
super(topics, deserializer, props);
}
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.10.x. Use this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for
+ * {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+ * with names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * @param subscriptionPattern
+ * The regular expression for a pattern of topic names to subscribe to.
+ * @param valueDeserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ @PublicEvolving
+ public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
+ this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.10.x. Use this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for
+ * {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+ * with names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param subscriptionPattern
+ * The regular expression for a pattern of topic names to subscribe to.
+ * @param deserializer
+ * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ @PublicEvolving
+ public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
+ super(subscriptionPattern, deserializer, props);
+ }
+
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
index 6f75828..c40463e 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -24,6 +25,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.regex.Pattern;
/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -110,4 +112,47 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
public FlinkKafkaConsumer011(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.11.x. Use this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for
+ * {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+ * with names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * @param subscriptionPattern
+ * The regular expression for a pattern of topic names to subscribe to.
+ * @param valueDeserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ @PublicEvolving
+ public FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
+ this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.11.x. Use this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for
+ * {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+ * with names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param subscriptionPattern
+ * The regular expression for a pattern of topic names to subscribe to.
+ * @param deserializer
+ * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ @PublicEvolving
+ public FlinkKafkaConsumer011(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
+ super(subscriptionPattern, deserializer, props);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 0a70f61..f362046 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -39,6 +40,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.PropertiesUtil.getLong;
@@ -156,13 +158,67 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
+ this(topics, null, deserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.8.x. Use this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for
+ * {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+ * with names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * @param subscriptionPattern
+ * The regular expression for a pattern of topic names to subscribe to.
+ * @param valueDeserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ @PublicEvolving
+ public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
+ this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.8.x. Use this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for
+ * {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+ * with names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param subscriptionPattern
+ * The regular expression for a pattern of topic names to subscribe to.
+ * @param deserializer
+ * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ @PublicEvolving
+ public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
+ this(null, subscriptionPattern, deserializer, props);
+ }
+
+ private FlinkKafkaConsumer08(
+ List<String> topics,
+ Pattern subscriptionPattern,
+ KeyedDeserializationSchema<T> deserializer,
+ Properties props) {
+
super(
topics,
- null,
+ subscriptionPattern,
deserializer,
- getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
+ getLong(
+ checkNotNull(props, "props"),
+ KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
- this.kafkaProperties = checkNotNull(props, "props");
+ this.kafkaProperties = props;
// validate the zookeeper properties
validateZooKeeperConfig(props);
http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 65be712..79be73c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -42,6 +43,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.PropertiesUtil.getLong;
@@ -147,9 +149,67 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
- super(topics, null, deserializer, getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
+ this(topics, null, deserializer, props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.9.x. Use this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for
+ * {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+ * with names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * @param subscriptionPattern
+ * The regular expression for a pattern of topic names to subscribe to.
+ * @param valueDeserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ @PublicEvolving
+ public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
+ this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
+ }
+
+ /**
+ * Creates a new Kafka streaming source consumer for Kafka 0.9.x. Use this constructor to
+ * subscribe to multiple topics based on a regular expression pattern.
+ *
+ * <p>If partition discovery is enabled (by setting a non-negative value for
+ * {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+ * with names matching the pattern will also be subscribed to as they are created on the fly.
+ *
+ * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+ * pairs, offsets, and topic names from Kafka.
+ *
+ * @param subscriptionPattern
+ * The regular expression for a pattern of topic names to subscribe to.
+ * @param deserializer
+ * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ @PublicEvolving
+ public FlinkKafkaConsumer09(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
+ this(null, subscriptionPattern, deserializer, props);
+ }
+
+ private FlinkKafkaConsumer09(
+ List<String> topics,
+ Pattern subscriptionPattern,
+ KeyedDeserializationSchema<T> deserializer,
+ Properties props) {
+
+ super(
+ topics,
+ subscriptionPattern,
+ deserializer,
+ getLong(
+ checkNotNull(props, "props"),
+ KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
- this.properties = checkNotNull(props, "props");
+ this.properties = props;
setDeserializer(this.properties);
// configure the polling timeout
[2/2] flink git commit: [hotfix] [kafka] Fix outdated Javadoc
reference to non-existing restoreState method
Posted by tz...@apache.org.
[hotfix] [kafka] Fix outdated Javadoc reference to non-existing restoreState method
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e900745
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e900745
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e900745
Branch: refs/heads/master
Commit: 7e90074502d0ac99dfedfbcc33190fc5a4898630
Parents: c40b84d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Dec 5 00:34:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Dec 5 00:39:47 2017 +0800
----------------------------------------------------------------------
.../flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7e900745/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 7cd1ae1..865d66c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -162,8 +162,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
- * <p>This map will be populated either by the legacy {@link #restoreState(HashMap)} method
- * or {@link #initializeState(FunctionInitializationContext)}.
+ * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
*
* <p>Using a sorted map as the ordering is important when using restored state
* to seed the partition discoverer.