You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/05/26 20:40:43 UTC

[flink] branch release-1.11 updated: [FLINK-17076][docs] Revamp Kafka Connector Documentation

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f7fa02f  [FLINK-17076][docs] Revamp Kafka Connector Documentation
f7fa02f is described below

commit f7fa02fdb450af9d11c92bd90751fa9161add47a
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue May 19 15:08:13 2020 -0500

    [FLINK-17076][docs] Revamp Kafka Connector Documentation
    
    This closes #12257
---
 docs/dev/connectors/kafka.md | 488 +++++++++++++++----------------------------
 1 file changed, 164 insertions(+), 324 deletions(-)

diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index d084808..6f21ff7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -23,125 +23,59 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading data from and writing data to Kafka topics with exactly-once guaruntees.
+
 * This will be replaced by the TOC
 {:toc}
 
-This connector provides access to event streams served by [Apache Kafka](https://kafka.apache.org/).
-
-Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to provide
-exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka's consumer group
-offset tracking, but tracks and checkpoints these offsets internally as well.
-
-Please pick a package (maven artifact id) and class name for your use-case and environment.
-For most users, the `FlinkKafkaConsumer010` (part of `flink-connector-kafka`) is appropriate.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">Maven Dependency</th>
-      <th class="text-left">Supported since</th>
-      <th class="text-left">Consumer and <br>
-      Producer Class name</th>
-      <th class="text-left">Kafka version</th>
-      <th class="text-left">Notes</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>flink-connector-kafka-0.10{{ site.scala_version_suffix }}</td>
-        <td>1.2.0</td>
-        <td>FlinkKafkaConsumer010<br>
-        FlinkKafkaProducer010</td>
-        <td>0.10.x</td>
-        <td>This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka messages with timestamps</a> both for producing and consuming.</td>
-    </tr>
-    <tr>
-        <td>flink-connector-kafka-0.11{{ site.scala_version_suffix }}</td>
-        <td>1.4.0</td>
-        <td>FlinkKafkaConsumer011<br>
-        FlinkKafkaProducer011</td>
-        <td>0.11.x</td>
-        <td>Since 0.11.x Kafka does not support scala 2.10. This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging">Kafka transactional messaging</a> to provide exactly once semantic for the producer.</td>
-    </tr>
-    <tr>
-        <td>flink-connector-kafka{{ site.scala_version_suffix }}</td>
-        <td>1.7.0</td>
-        <td>FlinkKafkaConsumer<br>
-        FlinkKafkaProducer</td>
-        <td>>= 1.0.0</td>
-        <td>
-        This universal Kafka connector attempts to track the latest version of the Kafka client.
-        The version of the client it uses may change between Flink releases. Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client.
-        Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
-        However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated
-        flink-connector-kafka-0.11{{ site.scala_version_suffix }} and flink-connector-kafka-0.10{{ site.scala_version_suffix }} respectively.
-        </td>
-    </tr>
-  </tbody>
-</table>
-
-Then, import the connector in your maven project:
+## Dependency
+
+Apache Flink ships with multiple Kafka connectors: universal, 0.10, and 0.11.
+This universal Kafka connector attempts to track the latest version of the Kafka client.
+The version of the client it uses may change between Flink releases.
+Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
+For most users the universal Kafka connector is the most appropriate.
+However, for Kafka versions 0.11.x and 0.10.x, we recommend using the dedicated ``0.11`` and ``0.10`` connectors, respectively.
+For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
 
+<div class="codetabs" markdown="1">
+<div data-lang="universal" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
+	<version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %} 
+</div>
+<div data-lang="011" markdown="1">
+{% highlight xml %}
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-connector-kafka-011{{ site.scala_version_suffix }}</artifactId>
+	<version>{{ site.version }}</version>
 </dependency>
 {% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary distribution.
-See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/projectsetup/dependencies.html).
-
-## Installing Apache Kafka
-
-* Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
-* If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.
-
-## Kafka 1.0.0+ Connector
-
-Starting with Flink 1.7, there is a new universal Kafka connector that does not track a specific Kafka major version.
-Rather, it tracks the latest version of Kafka at the time of the Flink release.
-
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka connector.
-If you use an older version of Kafka (0.11 or 0.10), you should use the connector corresponding to the broker version.
-
-### Compatibility
-
-The universal Kafka connector is compatible with older and newer Kafka brokers through the compatibility guarantees of the Kafka client API and broker.
-It is compatible with broker versions 0.11.0 or newer, depending on the features used.
-For details on Kafka compatibility, please refer to the [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
-
-### Migrating Kafka Connector from 0.11 to universal
-
-In order to perform the migration, see the [upgrading jobs and Flink versions guide]({{ site.baseurl }}/ops/upgrading.html)
-and:
-* Use Flink 1.9 or newer for the whole process.
-* Do not upgrade the Flink and operators at the same time.
-* Make sure that Kafka Consumer and/or Kafka Producer used in your job have assigned unique identifiers (`uid`):
-* Use stop with savepoint feature to take the savepoint (for example by using `stop --withSavepoint`)[CLI command]({{ site.baseurl }}/ops/cli.html).
-
-### Usage
-
-To use the universal Kafka connector add a dependency to it:
-
+</div>
+<div data-lang="010" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-connector-kafka-010{{ site.scala_version_suffix }}</artifactId>
+	<version>{{ site.version }}</version>
 </dependency>
 {% endhighlight %}
+<span class="label label-danger">Attention</span> The ``0.10`` sink does not support exactly-once writes to Kafka.
+</div>
+</div>
 
-Then instantiate the new source (`FlinkKafkaConsumer`) and sink (`FlinkKafkaProducer`).
-The API is backward compatible with the Kafka 0.11 connector,
-except of dropping specific Kafka version from the module and class names.
+Flink's streaming connectors are not currently part of the binary distribution.
+See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/projectsetup/dependencies.html).
 
 ## Kafka Consumer
 
-Flink's Kafka consumer is called `FlinkKafkaConsumer010` (or 011 for Kafka 0.11.0.x versions, etc.
-or just `FlinkKafkaConsumer` for Kafka >= 1.0.0 versions). It provides access to one or more Kafka topics.
+Flink's Kafka consumer - `FlinkKafkaConsumer` (or `FlinkKafkaConsumer011` for Kafka 0.11.x,
+or `FlinkKafkaConsumer010` for Kafka 0.10.x) - provides access to read from one or more Kafka topics.
 
 The constructor accepts the following arguments:
 
@@ -152,8 +86,6 @@ The constructor accepts the following arguments:
   - "bootstrap.servers" (comma separated list of Kafka brokers)
   - "group.id" the id of the consumer group
 
-Example:
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -161,7 +93,7 @@ Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("group.id", "test");
 DataStream<String> stream = env
-	.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));
+	.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -170,26 +102,17 @@ val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("group.id", "test")
 stream = env
-    .addSource(new FlinkKafkaConsumer010[String]("topic", new SimpleStringSchema(), properties))
-    .print()
+    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
 {% endhighlight %}
 </div>
 </div>
 
 ### The `DeserializationSchema`
 
-The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects. The
-`DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
-method gets called for each Kafka message, passing the value from Kafka.
-
-It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care of describing the
-produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema` need
-to implement the `getProducedType(...)` method themselves.
+The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala objects.
+The `KafkaDeserializationSchema` allows users to specify such a schema. The `T deserialize(ConsumerRecord<byte[], byte[]> record)` method gets called for each Kafka message, passing the value from Kafka.
 
-For accessing the key, value and metadata of the Kafka message, the `KafkaDeserializationSchema` has
-the following deserialize method `T deserialize(ConsumerRecord<byte[], byte[]> record)`.
-
-For convenience, Flink provides the following schemas:
+For convenience, Flink provides the following schemas out of the box:
 
 1. `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`) which creates
     a schema based on a Flink's `TypeInformation`. This is useful if the data is both written and read by Flink.
@@ -216,46 +139,37 @@ For convenience, Flink provides the following schemas:
 <div data-lang="AvroDeserializationSchema" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-avro</artifactId>
-  <version>{{site.version }}</version>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-avro</artifactId>
+    <version>{{site.version }}</version>
 </dependency>
 {% endhighlight %}
 </div>
 <div data-lang="ConfluentRegistryAvroDeserializationSchema" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-avro-confluent-registry</artifactId>
-  <version>{{site.version }}</version>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-avro-confluent-registry</artifactId>
+    <version>{{site.version }}</version>
 </dependency>
 {% endhighlight %}
 </div>
 </div>
 
-When encountering a corrupted message that cannot be deserialized for any reason, there
-are two options - either throwing an exception from the `deserialize(...)` method
-which will cause the job to fail and be restarted, or returning `null` to allow
-the Flink Kafka consumer to silently skip the corrupted message. Note that
-due to the consumer's fault tolerance (see below sections for more details),
-failing the job on the corrupted message will let the consumer attempt
-to deserialize the message again. Therefore, if deserialization still fails, the
-consumer will fall into a non-stop restart and fail loop on that corrupted
-message.
+When encountering a corrupted message that cannot be deserialized for any reason the deserialization schema should return null which will result in the record being skipped.
+Due to the consumer's fault tolerance (see below sections for more details), failing the job on the corrupted message will let the consumer attempt to deserialize the message again.
+Therefore, if deserialization still fails, the consumer will fall into a non-stop restart and fail loop on that corrupted message.
 
 ### Kafka Consumers Start Position Configuration
 
-The Flink Kafka Consumer allows configuring how the start position for Kafka
-partitions are determined.
-
-Example:
+The Flink Kafka Consumer allows configuring how the start positions for Kafka partitions are determined.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(...);
+FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
 myConsumer.setStartFromEarliest();     // start from the earliest record possible
 myConsumer.setStartFromLatest();       // start from the latest record
 myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
@@ -269,7 +183,7 @@ DataStream<String> stream = env.addSource(myConsumer);
 {% highlight scala %}
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
-val myConsumer = new FlinkKafkaConsumer010[String](...)
+val myConsumer = new FlinkKafkaConsumer[String](...)
 myConsumer.setStartFromEarliest()      // start from the earliest record possible
 myConsumer.setStartFromLatest()        // start from the latest record
 myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
@@ -339,34 +253,14 @@ fault tolerance for the consumer).
 ### Kafka Consumers and Fault Tolerance
 
 With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all
-its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore
+its Kafka offsets, together with the state of other operations. In case of a job failure, Flink will restore
 the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were
 stored in the checkpoint.
 
 The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
+To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled in the [job]({{ site.baseurl }}/ops/config.html#execution-checkpointing-interval).
 
-To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-</div>
-
-Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
-So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
-Flink on YARN supports automatic restart of lost YARN containers.
-
-If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
+If checkpointing is disabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
 
 ### Kafka Consumers Topic and Partition Discovery
 
@@ -380,15 +274,9 @@ By default, partition discovery is disabled. To enable it, set a non-negative va
 for `flink.partition-discovery.interval-millis` in the provided properties config,
 representing the discovery interval in milliseconds. 
 
-<span class="label label-danger">Limitation</span> When the consumer is restored from a savepoint from Flink versions
-prior to Flink 1.3.x, partition discovery cannot be enabled on the restore run. If enabled, the restore would fail
-with an exception. In this case, in order to use partition discovery, please first take a savepoint in Flink 1.3.x and
-then restore again from that.
-
 #### Topic discovery
 
-At a higher-level, the Flink Kafka Consumer is also capable of discovering topics, based on pattern matching on the
-topic names using regular expressions. See the below for an example:
+The Kafka Consumer is also capable of discovering topics by matching topic names using regular expressions.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -399,7 +287,7 @@ Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("group.id", "test");
 
-FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
+FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
     java.util.regex.Pattern.compile("test-topic-[0-9]"),
     new SimpleStringSchema(),
     properties);
@@ -416,7 +304,7 @@ val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("group.id", "test")
 
-val myConsumer = new FlinkKafkaConsumer010[String](
+val myConsumer = new FlinkKafkaConsumer[String](
   java.util.regex.Pattern.compile("test-topic-[0-9]"),
   new SimpleStringSchema,
   properties)
@@ -445,7 +333,7 @@ tolerance guarantees. The committed offsets are only a means to expose
 the consumer's progress for monitoring purposes.
 
 The way to configure offset commit behaviour is different, depending on
-whether or not checkpointing is enabled for the job.
+whether checkpointing is enabled for the job.
 
  - *Checkpointing disabled:* if checkpointing is disabled, the Flink Kafka
  Consumer relies on the automatic periodic offset committing capability
@@ -465,15 +353,14 @@ whether or not checkpointing is enabled for the job.
 
 ### Kafka Consumers and Timestamp Extraction/Watermark Emission
 
-In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself.
-In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on
+In many scenarios, the timestamp of a record is embedded in the record itself, or the metadata of the `ConsumerRecord`.
+In addition, users may want to emit watermarks either periodically, or irregularly, e.g. based on
 special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka
-Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an `AssignerWithPunctuatedWatermarks`.
+Consumer allows the specification of a [watermark strategy]({% link dev/event_time.md %}).
 
-You can specify your custom timestamp extractor/watermark emitter as described
-[here]({{ site.baseurl }}/dev/event_timestamps_watermarks.html), or use one from the
-[predefined ones]({{ site.baseurl }}/dev/event_timestamp_extractors.html). After doing so, you
-can pass it to your consumer in the following way:
+You can specify your custom strategy as described
+[here]({% link dev/event_timestamps_watermarks.md %}), or use one from the
+[predefined ones]({% link dev/event_timestamp_extractors.md %}). 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -482,13 +369,14 @@ Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("group.id", "test");
 
-FlinkKafkaConsumer010<String> myConsumer =
-    new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties);
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+FlinkKafkaConsumer<String> myConsumer =
+    new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(
+    WatermarkStrategies.
+        .<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .build());
 
-DataStream<String> stream = env
-	.addSource(myConsumer)
-	.print();
+DataStream<String> stream = env.addSource(myConsumer);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -497,51 +385,51 @@ val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("group.id", "test")
 
-val myConsumer = new FlinkKafkaConsumer010[String]("topic", new SimpleStringSchema(), properties)
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
-stream = env
-    .addSource(myConsumer)
-    .print()
+val myConsumer =
+    new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(
+    WatermarkStrategies.
+        .forBoundedOutOfOrderness[String](Duration.ofSeconds(20))
+        .build())
+
+val stream = env.addSource(myConsumer)
 {% endhighlight %}
 </div>
 </div>
 
-Internally, an instance of the assigner is executed per Kafka partition.
-When such an assigner is specified, for each record read from Kafka, the
-`extractTimestamp(T element, long previousElementTimestamp)` is called to assign a timestamp to the record and
-the `Watermark getCurrentWatermark()` (for periodic) or the
-`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine
-if a new watermark should be emitted and with which timestamp.
 
 **Note**: If a watermark assigner depends on records read from Kafka to advance its watermarks
 (which is commonly the case), all topics and partitions need to have a continuous stream of records.
 Otherwise, the watermarks of the whole application cannot advance and all time-based operations,
 such as time windows or functions with timers, cannot make progress. A single idle Kafka partition causes this behavior.
-A Flink improvement is planned to prevent this from happening 
-(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions](
-https://issues.apache.org/jira/browse/FLINK-5479)).
-In the meanwhile, a possible workaround is to send *heartbeat messages* to all consumed partitions that advance the watermarks of idle partitions.
-
+Consider setting appropriate [idelness timeouts]({{ site.baseurl }}/dev/event_timestamps_watermarks.html#dealing-with-idle-sources) to mitigate this issue.
+ 
 ## Kafka Producer
 
-Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 0.10.0.x versions, etc. or just `FlinkKafkaProducer` for Kafka >= 1.0.0 versions).
-It allows writing a stream of records to one or more Kafka topics.
+Flink’s Kafka Producer - `FlinkKafkaProducer` (or `FlinkKafkaProducer010` for Kafka 0.10.x versions or `FlinkKafkaProducer011` for Kafka 0.11.x versions) -
+ allows writing a stream of records to one or more Kafka topics.
+
+The constructor accepts the following arguments:
 
-Example:
+1. A default output topic where events should be written
+2. A SerializationSchema / KafkaSerializationSchema for serializing data into Kafka
+3. Properties for the Kafka client. The following properties are required:
+    * "bootstrap.servers" (comma separated list of Kafka brokers)
+4. A fault-tolerance semantic
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<String> stream = ...;
+DataStream<String> stream = ...
 
-FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
-        "localhost:9092",            // broker list
-        "my-topic",                  // target topic
-        new SimpleStringSchema());   // serialization schema
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
 
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
-// this method is not available for earlier Kafka versions
-myProducer.setWriteTimestampToKafka(true);
+FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
+        "my-topic",                  // target topic
+        new SimpleStringSchema(),    // serialization schema
+        properties,                  // producer config
+        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
 
 stream.addSink(myProducer);
 {% endhighlight %}
@@ -550,91 +438,36 @@ stream.addSink(myProducer);
 {% highlight scala %}
 val stream: DataStream[String] = ...
 
-val myProducer = new FlinkKafkaProducer011[String](
-        "localhost:9092",         // broker list
-        "my-topic",               // target topic
-        new SimpleStringSchema)   // serialization schema
+Properties properties = new Properties
+properties.setProperty("bootstrap.servers", "localhost:9092")
 
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
-// this method is not available for earlier Kafka versions
-myProducer.setWriteTimestampToKafka(true)
+val myProducer = new FlinkKafkaProducer[String](
+        "my-topic",                  // target topic
+        new SimpleStringSchema(),    // serialization schema
+        properties,                  // producer config
+        FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
 
 stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 </div>
 
-The above examples demonstrate the basic usage of creating a Flink Kafka Producer
-to write streams to a single Kafka target topic. For more advanced usages, there
-are other constructor variants that allow providing the following:
-
- * *Providing custom properties*:
- The producer allows providing a custom properties configuration for the internal `KafkaProducer`.
- Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
- details on how to configure Kafka Producers.
- * *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
- constructor. This partitioner will be called for each record in the stream
- to determine which exact partition of the target topic the record should be sent to.
- Please see [Kafka Producer Partitioning Scheme](#kafka-producer-partitioning-scheme) for more details.
- * *Advanced serialization schema*: Similar to the consumer,
- the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
- which allows serializing the key and value separately. It also allows to override the target topic,
- so that one producer instance can send data to multiple topics.
- 
-### Kafka Producer Partitioning Scheme
- 
-By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use
-a `FlinkFixedPartitioner` that maps each Flink Kafka Producer parallel subtask to a single Kafka partition
-(i.e., all records received by a sink subtask will end up in the same Kafka partition).
-
-A custom partitioner can be implemented by extending the `FlinkKafkaPartitioner` class. All
-Kafka versions' constructors allow providing a custom partitioner when instantiating the producer.
-Note that the partitioner implementation must be serializable, as they will be transferred across Flink nodes.
-Also, keep in mind that any state in the partitioner will be lost on job failures since the partitioner
-is not part of the producer's checkpointed state.
-
-It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partition
-the written records by their attached key (as determined for each record using the provided serialization schema).
-To do this, provide a `null` custom partitioner when instantiating the producer. It is important
-to provide `null` as the custom partitioner; as explained above, if a custom partitioner is not specified
-the `FlinkFixedPartitioner` is used instead.
+## The `SerializationSchema`
 
-### Kafka Producers and Fault Tolerance
+The Flink Kafka Producer needs to know how to turn Java/Scala objects into binary data.
+The `KafkaSerializationSchema` allows users to specify such a schema.
+The `ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp)` method gets called for each record, generating a `ProducerRecord` that is written to Kafka.
 
-#### Kafka 0.10
+The gives users fine-grained control over how data is written out to Kafka. 
+Through the producer record you can:
+* Set header values
+* Define keys for each record
+* Specify custom partitioning of data
 
-With Flink's checkpointing enabled, the `FlinkKafkaProducer010`
-can provide at-least-once delivery guarantees.
-
-Besides enabling Flink's checkpointing, you should also configure the setter
-methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately.
-
- * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
- Enabling this will let the producer only log failures
- instead of catching and rethrowing them. This essentially accounts the record
- to have succeeded, even if it was never written to the target Kafka topic. This
- must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
- With this enabled, Flink's checkpoints will wait for any
- on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
- succeeding the checkpoint. This ensures that all records before the checkpoint have
- been written to Kafka. This must be enabled for at-least-once.
- 
-In conclusion, the Kafka producer by default has at-least-once guarantees for versions
-0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
-to `true`.
-
-**Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
-the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid
-duplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes,
-we recommend setting the number of retries to a higher value.
-
-**Note**: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
-into a Kafka topic.
-
-#### Kafka 0.11 and newer
+### Kafka Producers and Fault Tolerance
 
+<div class="codetabs" markdown="1">
+<div data-lang="Universal and 011" markdown="1">
 With Flink's checkpointing enabled, the `FlinkKafkaProducer011` (`FlinkKafkaProducer` for Kafka >= 1.0.0 versions) can provide
 exactly-once delivery guarantees.
 
@@ -643,9 +476,8 @@ chosen by passing appropriate `semantic` parameter to the `FlinkKafkaProducer011
 
  * `Semantic.NONE`: Flink will not guarantee anything. Produced records can be lost or they can
  be duplicated.
- * `Semantic.AT_LEAST_ONCE` (default setting): similar to `setFlushOnCheckpoint(true)` in
- `FlinkKafkaProducer010`. This guarantees that no records will be lost (although they can be duplicated).
- * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once semantic. Whenever you write
+ * `Semantic.AT_LEAST_ONCE` (default setting): This guarantees that no records will be lost (although they can be duplicated).
+ * `Semantic.EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write
  to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_committed`
  or `read_uncommitted` - the latter one is the default value) for any application consuming records
  from Kafka.
@@ -695,54 +527,53 @@ event of failure of Flink application before first checkpoint, after restarting
 is no information in the system about previous pool sizes. Thus it is unsafe to scale down Flink
 application before first checkpoint completes, by factor larger than `FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR`.
 
-## Using Kafka timestamps and Flink event time in Kafka 0.10
-
-Since Apache Kafka 0.10+, Kafka's messages can carry
-[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating
-the time the event has occurred (see ["event time" in Apache Flink](../event_time.html)) or the time when the message
-has been written to the Kafka broker.
-
-The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if the time characteristic in Flink is 
-set to `TimeCharacteristic.EventTime` (`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
-
-The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in 
-"Kafka Consumers and Timestamp Extraction/Watermark Emission"  using the `assignTimestampsAndWatermarks` method are applicable.
-
-There is no need to define a timestamp extractor when using the timestamps from Kafka. The `previousElementTimestamp` argument of 
-the `extractTimestamp()` method contains the timestamp carried by the Kafka message.
-
-A timestamp extractor for a Kafka consumer would look like this:
-{% highlight java %}
-public long extractTimestamp(Long element, long previousElementTimestamp) {
-    return previousElementTimestamp;
-}
-{% endhighlight %}
-
+</div>
+<div data-lang="010" markdown="1">
+With Flink's checkpointing enabled, the `FlinkKafkaProducer010`
+can provide at-least-once delivery guarantees.
 
+Besides enabling Flink's checkpointing, you should also configure the setter
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately.
 
-The `FlinkKafkaProducer010` only emits the record timestamp, if `setWriteTimestampToKafka(true)` is set.
+ * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
+ Enabling this will let the producer only log failures
+ instead of catching and rethrowing them. This essentially accounts the record
+ to have succeeded, even if it was never written to the target Kafka topic. This
+ must be disabled for at-least-once.
+ * `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
+ With this enabled, Flink's checkpoints will wait for any
+ on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
+ succeeding the checkpoint. This ensures that all records before the checkpoint have
+ been written to Kafka. This must be enabled for at-least-once.
+ 
+In conclusion, the Kafka producer by default has at-least-once guarantees for versions
+0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
+to `true`.
 
-{% highlight java %}
-FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
-config.setWriteTimestampToKafka(true);
-{% endhighlight %}
+**Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
+the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid
+duplicate messages in the target topic that are caused by retries. For most production environments with frequent broker changes,
+we recommend setting the number of retries to a higher value.
 
+**Note**: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
+into a Kafka topic.
 
+</div>
+</div>
 
-## Kafka Connector metrics
+## Kafka Connector Metrics
 
 Flink's Kafka connectors provide some metrics through Flink's [metrics system]({{ site.baseurl }}/monitoring/metrics.html) to analyze
 the behavior of the connector.
-The producers export Kafka's internal metrics through Flink's metric system for all supported versions. The consumers export 
-all metrics starting from Kafka version 0.10. The Kafka documentation lists all exported metrics 
-in its [documentation](http://kafka.apache.org/documentation/#selector_monitoring).
+The producers export Kafka's internal metrics through Flink's metric system for all supported versions.
+The Kafka documentation lists all exported metrics in its [documentation](http://kafka.apache.org/documentation/#selector_monitoring).
 
 In addition to these metrics, all consumers expose the `current-offsets` and `committed-offsets` for each topic partition.
 The `current-offsets` refers to the current offset in the partition. This refers to the offset of the last element that
 we retrieved and emitted successfully. The `committed-offsets` is the last committed offset.
 
-The Kafka Consumers in Flink commit the offsets back to the Kafka brokers (Kafka 0.10+). If checkpointing
-is disabled, offsets are committed periodically.
+The Kafka Consumers in Flink commit the offsets back to the Kafka brokers.
+If checkpointing is disabled, offsets are committed periodically.
 With checkpointing, the commit happens once all operators in the streaming topology have confirmed that they've created a checkpoint of their state. 
 This provides users with at-least-once semantics for the offsets committed to Zookeeper or the broker. For offsets checkpointed to Flink, the system 
 provides exactly once guarantees.
@@ -776,6 +607,15 @@ A mismatch in service name between client and server configuration will cause th
 For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/ops/config.html).
 You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further details on how Flink internally setups Kerberos-based security.
 
+## Migrating Kafka Connector from 0.11 to universal
+
+In order to perform the migration, see the [upgrading jobs and Flink versions guide]({{ site.baseurl }}/ops/upgrading.html)
+and:
+* Use Flink 1.9 or newer for the whole process.
+* Do not upgrade Flink and user operators at the same time.
+* Make sure that Kafka Consumer and/or Kafka Producer used in your job have assigned unique identifiers (`uid`):
+* Use stop with savepoint feature to take the savepoint (for example by using `stop --withSavepoint`)[CLI command]({{ site.baseurl }}/ops/cli.html).
+
 ## Troubleshooting
 
 <div class="alert alert-warning">