You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/12/04 16:40:55 UTC

[1/2] flink git commit: [FLINK-8190] [kafka] Add constructors to expose topic pattern-based subscription

Repository: flink
Updated Branches:
  refs/heads/master 7c5a6941b -> 7e9007450


[FLINK-8190] [kafka] Add constructors to expose topic pattern-based subscription

This closes #5117.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c40b84df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c40b84df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c40b84df

Branch: refs/heads/master
Commit: c40b84dfcf748d04f4e3866bf2c60b183c9cf6b3
Parents: 7c5a694
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 4 15:05:46 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Dec 5 00:31:57 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    | 55 ++++++++++++++++-
 .../connectors/kafka/FlinkKafkaConsumer010.java | 45 ++++++++++++++
 .../connectors/kafka/FlinkKafkaConsumer011.java | 45 ++++++++++++++
 .../connectors/kafka/FlinkKafkaConsumer08.java  | 62 ++++++++++++++++++-
 .../connectors/kafka/FlinkKafkaConsumer09.java  | 64 +++++++++++++++++++-
 5 files changed, 265 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 5376d5b..6c80370 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -294,7 +294,9 @@ Flink on YARN supports automatic restart of lost YARN containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
 
-### Kafka Consumers Partition Discovery
+### Kafka Consumers Topic and Partition Discovery
+
+#### Partition discovery
 
 The Flink Kafka Consumer supports discovering dynamically created Kafka partitions, and consumes them with
 exactly-once guarantees. All partitions discovered after the initial retrieval of partition metadata (i.e., when the
@@ -309,6 +311,57 @@ prior to Flink 1.3.x, partition discovery cannot be enabled on the restore run.
 with an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x and
 then restore again from that.
 
+#### Topic discovery
+
+At a higher-level, the Flink Kafka Consumer is also capable of discovering topics, based on pattern matching on the
+topic names using regular expressions. See the below for an example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+properties.setProperty("group.id", "test");
+
+FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
+    java.util.regex.Pattern.compile("test-topic-[0-9]"),
+    new SimpleStringSchema(),
+    properties);
+
+DataStream<String> stream = env.addSource(myConsumer);
+...
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val properties = new Properties()
+properties.setProperty("bootstrap.servers", "localhost:9092")
+properties.setProperty("group.id", "test")
+
+val myConsumer = new FlinkKafkaConsumer08[String](
+  java.util.regex.Pattern.compile("test-topic-[0-9]"),
+  new SimpleStringSchema,
+  properties)
+
+val stream = env.addSource(myConsumer)
+...
+{% endhighlight %}
+</div>
+</div>
+
+In the above example, all topics with names that match the specified regular expression
+(starting with `test-topic-` and ending with a single digit) will be subscribed by the consumer
+when the job starts running.
+
+To allow the consumer to discover dynamically created topics after the job started running,
+set a non-negative value for `flink.partition-discovery.interval-millis`. This allows
+the consumer to discover partitions of new topics with names that also match the specified
+pattern.
+
 ### Kafka Consumers Offset Committing Behaviour Configuration
 
 The Flink Kafka Consumer allows configuring the behaviour of how offsets

http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index f569477..6fb63e1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -39,6 +40,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -126,6 +128,49 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 		super(topics, deserializer, props);
 	}
 
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+	 * with names matching the pattern will also be subscribed to as they are created on the fly.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
+		this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+	 * with names matching the pattern will also be subscribed to as they are created on the fly.
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		super(subscriptionPattern, deserializer, props);
+	}
+
 	@Override
 	protected AbstractFetcher<T, ?> createFetcher(
 			SourceContext<T> sourceContext,

http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
index 6f75828..c40463e 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -24,6 +25,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
@@ -110,4 +112,47 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
 	public FlinkKafkaConsumer011(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
 		super(topics, deserializer, props);
 	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.11.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+	 * with names matching the pattern will also be subscribed to as they are created on the fly.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
+		this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.11.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+	 * with names matching the pattern will also be subscribed to as they are created on the fly.
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer011(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		super(subscriptionPattern, deserializer, props);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 0a70f61..f362046 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -39,6 +40,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.PropertiesUtil.getLong;
@@ -156,13 +158,67 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		this(topics, null, deserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+	 * with names matching the pattern will also be subscribed to as they are created on the fly.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
+		this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+	 * with names matching the pattern will also be subscribed to as they are created on the fly.
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		this(null, subscriptionPattern, deserializer, props);
+	}
+
+	private FlinkKafkaConsumer08(
+			List<String> topics,
+			Pattern subscriptionPattern,
+			KeyedDeserializationSchema<T> deserializer,
+			Properties props) {
+
 		super(
 				topics,
-				null,
+				subscriptionPattern,
 				deserializer,
-				getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
+				getLong(
+					checkNotNull(props, "props"),
+					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
 
-		this.kafkaProperties = checkNotNull(props, "props");
+		this.kafkaProperties = props;
 
 		// validate the zookeeper properties
 		validateZooKeeperConfig(props);

http://git-wip-us.apache.org/repos/asf/flink/blob/c40b84df/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 65be712..79be73c 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -42,6 +43,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.PropertiesUtil.getLong;
@@ -147,9 +149,67 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	 *           The properties that are used to configure both the fetcher and the offset handler.
 	 */
 	public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
-		super(topics, null, deserializer, getLong(props, KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
+		this(topics, null, deserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+	 * with names matching the pattern will also be subscribed to as they are created on the fly.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
+		this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.9.x. Use this constructor to
+	 * subscribe to multiple topics based on a regular expression pattern.
+	 *
+	 * <p>If partition discovery is enabled (by setting a non-negative value for
+	 * {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
+	 * with names matching the pattern will also be subscribed to as they are created on the fly.
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param subscriptionPattern
+	 *           The regular expression for a pattern of topic names to subscribe to.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	@PublicEvolving
+	public FlinkKafkaConsumer09(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		this(null, subscriptionPattern, deserializer, props);
+	}
+
+	private FlinkKafkaConsumer09(
+			List<String> topics,
+			Pattern subscriptionPattern,
+			KeyedDeserializationSchema<T> deserializer,
+			Properties props) {
+
+		super(
+				topics,
+				subscriptionPattern,
+				deserializer,
+				getLong(
+					checkNotNull(props, "props"),
+					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED));
 
-		this.properties = checkNotNull(props, "props");
+		this.properties = props;
 		setDeserializer(this.properties);
 
 		// configure the polling timeout


[2/2] flink git commit: [hotfix] [kafka] Fix outdated Javadoc reference to non-existing restoreState method

Posted by tz...@apache.org.
[hotfix] [kafka] Fix outdated Javadoc reference to non-existing restoreState method


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e900745
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e900745
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e900745

Branch: refs/heads/master
Commit: 7e90074502d0ac99dfedfbcc33190fc5a4898630
Parents: c40b84d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Dec 5 00:34:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Dec 5 00:39:47 2017 +0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e900745/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 7cd1ae1..865d66c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -162,8 +162,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	/**
 	 * The offsets to restore to, if the consumer restores state from a checkpoint.
 	 *
-	 * <p>This map will be populated either by the legacy {@link #restoreState(HashMap)} method
-	 * or {@link #initializeState(FunctionInitializationContext)}.
+	 * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
 	 *
 	 * <p>Using a sorted map as the ordering is important when using restored state
 	 * to seed the partition discoverer.