You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/06 05:51:57 UTC

[04/11] flink git commit: [FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour

[FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f68e790
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f68e790
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f68e790

Branch: refs/heads/master
Commit: 9f68e790fc28197f89638cc83d1612f8f7a796a8
Parents: b49ead3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:21:20 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:15:42 2018 -0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    | 46 +++++++----
 .../connectors/kafka/FlinkKafkaProducer010.java | 83 ++++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer011.java | 87 +++++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer08.java  | 78 ++++++++++++++++--
 .../connectors/kafka/FlinkKafkaProducer09.java  | 83 ++++++++++++++++---
 5 files changed, 322 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index c6461f9..e2df5fd 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -462,11 +462,8 @@ FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
         "my-topic",                  // target topic
         new SimpleStringSchema());   // serialization schema
 
-// the following is necessary for at-least-once delivery guarantee
-myProducer.setLogFailuresOnly(false);   // "false" by default
-myProducer.setFlushOnCheckpoint(true);  // "false" by default
-
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
+// this method is not available for earlier Kafka versions
 myProducer.setWriteTimestampToKafka(true);
 
 stream.addSink(myProducer);
@@ -481,11 +478,8 @@ val myProducer = new FlinkKafkaProducer011[String](
         "my-topic",               // target topic
         new SimpleStringSchema)   // serialization schema
 
-// the following is necessary for at-least-once delivery guarantee
-myProducer.setLogFailuresOnly(false)   // "false" by default
-myProducer.setFlushOnCheckpoint(true)  // "false" by default
-
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
+// this method is not available for earlier Kafka versions
 myProducer.setWriteTimestampToKafka(true)
 
 stream.addSink(myProducer)
@@ -505,11 +499,30 @@ are other constructor variants that allow providing the following:
  partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
  constructor. This partitioner will be called for each record in the stream
  to determine which exact partition of the target topic the record should be sent to.
+ Please see [Kafka Producer Partitioning Scheme](#kafka-producer-partitioning-scheme) for more details.
  * *Advanced serialization schema*: Similar to the consumer,
  the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
  which allows serializing the key and value separately. It also allows to override the target topic,
  so that one producer instance can send data to multiple topics.
  
+### Kafka Producer Partitioning Scheme
+ 
+By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use
+a `FlinkFixedPartitioner` that maps each Flink Kafka Producer parallel subtask to a single Kafka partition
+(i.e., all records received by a sink subtask will end up in the same Kafka partition).
+
+A custom partitioner can be implemented by extending the `FlinkKafkaPartitioner` class. All
+Kafka versions' constructors allow providing a custom partitioner when instantiating the producer.
+Note that the partitioner implementation must be serializable, as they will be transferred across Flink nodes.
+Also, keep in mind that any state in the partitioner will be lost on job failures since the partitioner
+is not part of the producer's checkpointed state.
+
+It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partition
+the written records by their attached key (as determined for each record using the provided serialization schema).
+To do this, provide a `null` custom partitioner when instantiating the producer. It is important
+to provide `null` as the custom partitioner; as explained above, if a custom partitioner is not specified
+the `FlinkFixedPartitioner` is used instead.
+
 ### Kafka Producers and Fault Tolerance
 
 #### Kafka 0.8
@@ -522,17 +535,22 @@ With Flink's checkpointing enabled, the `FlinkKafkaProducer09` and `FlinkKafkaPr
 can provide at-least-once delivery guarantees.
 
 Besides enabling Flink's checkpointing, you should also configure the setter
-methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately,
-as shown in the above examples in the previous section:
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately.
 
- * `setLogFailuresOnly(boolean)`: enabling this will let the producer log failures only
+ * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
+ Enabling this will let the producer only log failures
  instead of catching and rethrowing them. This essentially accounts the record
  to have succeeded, even if it was never written to the target Kafka topic. This
  must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: with this enabled, Flink's checkpoints will wait for any
+ * `setFlushOnCheckpoint(boolean)`: by default, this is set to `false`.
+ With this enabled, Flink's checkpoints will wait for any
  on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
  succeeding the checkpoint. This ensures that all records before the checkpoint have
  been written to Kafka. This must be enabled for at-least-once.
+ 
+In conclusion, to configure the Kafka producer to have at-least-once guarantees for versions
+0.9 and 0.10, `setLogFailureOnly` must be set to `false` and `setFlushOnCheckpoint` must be set
+to `true`.
 
 **Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
 the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid

http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 184a2e7..21e3a10 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -32,6 +32,8 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -124,12 +126,20 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
@@ -139,10 +149,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -151,15 +169,26 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+	public FlinkKafkaProducer010(
+			String topicId,
+			SerializationSchema<T> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<T> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 	}
 
@@ -169,6 +198,14 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -184,6 +221,14 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -196,11 +241,29 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	}
 
 	/**
-	 * Create Kafka producer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
 	 *
-	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+	public FlinkKafkaProducer010(
+			String topicId,
+			KeyedSerializationSchema<T> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<T> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index ccf11e7..d0f5039 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -277,13 +277,21 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, SerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -296,15 +304,26 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+	public FlinkKafkaProducer011(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 	}
 
@@ -314,6 +333,14 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -333,6 +360,14 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -352,12 +387,22 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional, Semantic, int)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
 	 * 			User defined serialization schema supporting key/value messages
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
+	 * @param semantic
+	 * 			Defines semantic that will be used by this producer (see {@link Semantic}).
 	 */
 	public FlinkKafkaProducer011(
 			String topicId,
@@ -374,12 +419,22 @@ public class FlinkKafkaProducer011<IN>
 
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param defaultTopicId The default topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
 	public FlinkKafkaProducer011(
 			String defaultTopicId,
@@ -396,12 +451,22 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param defaultTopicId The default topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}).
 	 * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 20900f0..fa80252 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -38,17 +40,26 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 
 	private static final long serialVersionUID = 1L;
 
-	// ------------------- Keyless serialization schema constructors ----------------------
+	// ------------------- Key-less serialization schema constructors ----------------------
+
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
@@ -58,10 +69,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -70,14 +89,26 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer08(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
@@ -88,6 +119,14 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -103,6 +142,14 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -115,14 +162,29 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer08(
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 407bad5..19f445f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -38,31 +40,47 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 
 	private static final long serialVersionUID = 1L;
 
-	// ------------------- Keyless serialization schema constructors ----------------------
+	// ------------------- Key-less serialization schema constructors ----------------------
 
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -71,15 +89,26 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer09(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
@@ -90,6 +119,14 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -105,6 +142,14 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -117,15 +162,29 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer09(
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}