You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/06 06:49:43 UTC

[1/9] flink git commit: [FLINK-8116] [DataStream] Fix stale comments referring to Checkpointed interface

Repository: flink
Updated Branches:
  refs/heads/release-1.4 4b2178677 -> 74135c9db


[FLINK-8116] [DataStream] Fix stale comments referring to Checkpointed interface


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6884ce98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6884ce98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6884ce98

Branch: refs/heads/release-1.4
Commit: 6884ce98f3936248a534e259789b9adde1ff9514
Parents: 4b21786
Author: Ankit Parashar <an...@gmail.com>
Authored: Mon Dec 4 23:46:16 2017 +0530
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:01:43 2018 -0800

----------------------------------------------------------------------
 docs/ops/state/state_backends.md                      |  2 +-
 .../api/functions/source/SourceFunction.java          | 14 +++++++-------
 .../api/scala/StreamExecutionEnvironment.scala        |  2 +-
 3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6884ce98/docs/ops/state/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index b32ad9f..cc2ffde 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -26,7 +26,7 @@ Programs written in the [Data Stream API]({{ site.baseurl }}/dev/datastream_api.
 
 - Windows gather elements or aggregates until they are triggered
 - Transformation functions may use the key/value state interface to store values
-- Transformation functions may implement the `Checkpointed` interface to make their local variables fault tolerant
+- Transformation functions may implement the `CheckpointedFunction` interface to make their local variables fault tolerant
 
 See also [state section]({{ site.baseurl }}/dev/stream/state/index.html) in the streaming API guide.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6884ce98/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 4665cc6..cb2e15f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -34,9 +34,9 @@ import java.io.Serializable;
  * The run method can run for as long as necessary. The source must, however, react to an
  * invocation of {@link #cancel()} by breaking out of its main loop.
  *
- * <h3>Checkpointed Sources</h3>
+ * <h3>CheckpointedFunction Sources</h3>
  *
- * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
+ * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
  * interface must ensure that state checkpointing, updating of internal state and emission of
  * elements are not done concurrently. This is achieved by using the provided checkpointing lock
  * object to protect update of state and emission of elements in a synchronized block.
@@ -44,7 +44,7 @@ import java.io.Serializable;
  * <p>This is the basic pattern one should follow when implementing a (checkpointed) source:
  *
  * <pre>{@code
- *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
+ *  public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction {
  *      private long count = 0L;
  *      private volatile boolean isRunning = true;
  *
@@ -61,9 +61,9 @@ import java.io.Serializable;
  *          isRunning = false;
  *      }
  *
- *      public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
+ *      public void snapshotState(FunctionSnapshotContext context) {  }
  *
- *      public void restoreState(Long state) { this.count = state; }
+ *      public void initializeState(FunctionInitializationContext context) {  }
  * }
  * }</pre>
  *
@@ -96,12 +96,12 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 * Starts the source. Implementations can use the {@link SourceContext} emit
 	 * elements.
 	 *
-	 * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
+	 * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
 	 * must lock on the checkpoint lock (using a synchronized block) before updating internal
 	 * state and emitting elements, to make both an atomic operation:
 	 *
 	 * <pre>{@code
-	 *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
+	 *  public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction<Long> {
 	 *      private long count = 0L;
 	 *      private volatile boolean isRunning = true;
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/6884ce98/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 9fd03c3..3bba505 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -229,7 +229,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * [[KeyedStream]] is maintained (heap, managed memory, externally), and where state
    * snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
    * functions (implementing the interface 
-   * [[org.apache.flink.streaming.api.checkpoint.Checkpointed]].
+   * [[org.apache.flink.streaming.api.checkpoint.CheckpointedFunction]].
    *
    * <p>The [[org.apache.flink.runtime.state.memory.MemoryStateBackend]] for example
    * maintains the state in heap memory, as objects. It is lightweight without extra 


[4/9] flink git commit: [FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour

Posted by tz...@apache.org.
[FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71974895
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71974895
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71974895

Branch: refs/heads/release-1.4
Commit: 71974895da966478f2e24fd36c08d7cf386a7050
Parents: c454ee3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:21:20 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:03:20 2018 -0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    | 46 +++++++----
 .../connectors/kafka/FlinkKafkaProducer010.java | 83 ++++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer011.java | 87 +++++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer08.java  | 82 +++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer09.java  | 83 ++++++++++++++++---
 5 files changed, 324 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index c6461f9..e2df5fd 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -462,11 +462,8 @@ FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
         "my-topic",                  // target topic
         new SimpleStringSchema());   // serialization schema
 
-// the following is necessary for at-least-once delivery guarantee
-myProducer.setLogFailuresOnly(false);   // "false" by default
-myProducer.setFlushOnCheckpoint(true);  // "false" by default
-
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
+// this method is not available for earlier Kafka versions
 myProducer.setWriteTimestampToKafka(true);
 
 stream.addSink(myProducer);
@@ -481,11 +478,8 @@ val myProducer = new FlinkKafkaProducer011[String](
         "my-topic",               // target topic
         new SimpleStringSchema)   // serialization schema
 
-// the following is necessary for at-least-once delivery guarantee
-myProducer.setLogFailuresOnly(false)   // "false" by default
-myProducer.setFlushOnCheckpoint(true)  // "false" by default
-
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
+// this method is not available for earlier Kafka versions
 myProducer.setWriteTimestampToKafka(true)
 
 stream.addSink(myProducer)
@@ -505,11 +499,30 @@ are other constructor variants that allow providing the following:
  partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
  constructor. This partitioner will be called for each record in the stream
  to determine which exact partition of the target topic the record should be sent to.
+ Please see [Kafka Producer Partitioning Scheme](#kafka-producer-partitioning-scheme) for more details.
  * *Advanced serialization schema*: Similar to the consumer,
  the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
  which allows serializing the key and value separately. It also allows to override the target topic,
  so that one producer instance can send data to multiple topics.
  
+### Kafka Producer Partitioning Scheme
+ 
+By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use
+a `FlinkFixedPartitioner` that maps each Flink Kafka Producer parallel subtask to a single Kafka partition
+(i.e., all records received by a sink subtask will end up in the same Kafka partition).
+
+A custom partitioner can be implemented by extending the `FlinkKafkaPartitioner` class. All
+Kafka versions' constructors allow providing a custom partitioner when instantiating the producer.
+Note that the partitioner implementation must be serializable, as they will be transferred across Flink nodes.
+Also, keep in mind that any state in the partitioner will be lost on job failures since the partitioner
+is not part of the producer's checkpointed state.
+
+It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partition
+the written records by their attached key (as determined for each record using the provided serialization schema).
+To do this, provide a `null` custom partitioner when instantiating the producer. It is important
+to provide `null` as the custom partitioner; as explained above, if a custom partitioner is not specified
+the `FlinkFixedPartitioner` is used instead.
+
 ### Kafka Producers and Fault Tolerance
 
 #### Kafka 0.8
@@ -522,17 +535,22 @@ With Flink's checkpointing enabled, the `FlinkKafkaProducer09` and `FlinkKafkaPr
 can provide at-least-once delivery guarantees.
 
 Besides enabling Flink's checkpointing, you should also configure the setter
-methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately,
-as shown in the above examples in the previous section:
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately.
 
- * `setLogFailuresOnly(boolean)`: enabling this will let the producer log failures only
+ * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
+ Enabling this will let the producer only log failures
  instead of catching and rethrowing them. This essentially accounts the record
  to have succeeded, even if it was never written to the target Kafka topic. This
  must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: with this enabled, Flink's checkpoints will wait for any
+ * `setFlushOnCheckpoint(boolean)`: by default, this is set to `false`.
+ With this enabled, Flink's checkpoints will wait for any
  on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
  succeeding the checkpoint. This ensures that all records before the checkpoint have
  been written to Kafka. This must be enabled for at-least-once.
+ 
+In conclusion, to configure the Kafka producer to have at-least-once guarantees for versions
+0.9 and 0.10, `setLogFailureOnly` must be set to `false` and `setFlushOnCheckpoint` must be set
+to `true`.
 
 **Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
 the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid

http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 184a2e7..21e3a10 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -32,6 +32,8 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -124,12 +126,20 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
@@ -139,10 +149,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -151,15 +169,26 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+	public FlinkKafkaProducer010(
+			String topicId,
+			SerializationSchema<T> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<T> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 	}
 
@@ -169,6 +198,14 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -184,6 +221,14 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -196,11 +241,29 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	}
 
 	/**
-	 * Create Kafka producer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
 	 *
-	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+	public FlinkKafkaProducer010(
+			String topicId,
+			KeyedSerializationSchema<T> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<T> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index b14e487..58355c9 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -277,13 +277,21 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, SerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -296,15 +304,26 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+	public FlinkKafkaProducer011(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 	}
 
@@ -314,6 +333,14 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -333,6 +360,14 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -352,12 +387,22 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional, Semantic, int)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
 	 * 			User defined serialization schema supporting key/value messages
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
+	 * @param semantic
+	 * 			Defines semantic that will be used by this producer (see {@link Semantic}).
 	 */
 	public FlinkKafkaProducer011(
 			String topicId,
@@ -374,12 +419,22 @@ public class FlinkKafkaProducer011<IN>
 
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param defaultTopicId The default topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
 	public FlinkKafkaProducer011(
 			String defaultTopicId,
@@ -396,12 +451,22 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param defaultTopicId The default topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}).
 	 * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index d2f17d2..42fb892 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -38,17 +40,26 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 
 	private static final long serialVersionUID = 1L;
 
-	// ------------------- Keyless serialization schema constructors ----------------------
+	// ------------------- Key-less serialization schema constructors ----------------------
+
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
@@ -58,10 +69,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -70,14 +89,26 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer08(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
@@ -88,6 +119,14 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -103,6 +142,14 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -115,14 +162,29 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer08(
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 407bad5..19f445f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -38,31 +40,47 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 
 	private static final long serialVersionUID = 1L;
 
-	// ------------------- Keyless serialization schema constructors ----------------------
+	// ------------------- Key-less serialization schema constructors ----------------------
 
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -71,15 +89,26 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer09(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
@@ -90,6 +119,14 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -105,6 +142,14 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -117,15 +162,29 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer09(
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 


[8/9] flink git commit: [hotfix] [kafka] Add serialVersionUID to FlinkKafkaProducer010

Posted by tz...@apache.org.
[hotfix] [kafka] Add serialVersionUID to FlinkKafkaProducer010


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f462f771
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f462f771
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f462f771

Branch: refs/heads/release-1.4
Commit: f462f77194b76139591d478c4b66ef0676449849
Parents: 16a49de
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:38:32 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:04:05 2018 -0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaProducer010.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f462f771/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 369ab89..e721340 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -41,6 +41,8 @@ import java.util.Properties;
  */
 public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 
+	private static final long serialVersionUID = 1L;
+
 	/**
 	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
 	 */


[6/9] flink git commit: [hotfix] [kafka] Properly deprecate FlinkKafkaProducer010Configuration

Posted by tz...@apache.org.
[hotfix] [kafka] Properly deprecate FlinkKafkaProducer010Configuration

FlinkKafkaProducer010Configuration is the return type of the deprecated
writeToKafkaWithTimestamp factory methods. Therefore, the class should
also be deprecated as well.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24d44935
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24d44935
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24d44935

Branch: refs/heads/release-1.4
Commit: 24d449353e89ae319b81c4d59da5403a243f8b07
Parents: 10f1acf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:34:17 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:03:50 2018 -0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaProducer010.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24d44935/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 0e64aa5..369ab89 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -386,7 +386,11 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 *
 	 * <p>To enable the settings, this fake sink must override all the public methods
 	 * in {@link DataStreamSink}.</p>
+	 *
+	 * @deprecated This class is deprecated since the factory methods {@code writeToKafkaWithTimestamps}
+	 *             for the producer are also deprecated.
 	 */
+	@Deprecated
 	public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
 
 		private final FlinkKafkaProducer010 producer;


[2/9] flink git commit: [FLINK-8116] [DataStream] Provide proper checkpointed source function example in Javadocs

Posted by tz...@apache.org.
[FLINK-8116] [DataStream] Provide proper checkpointed source function example in Javadocs

This closes #5121.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e637c54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e637c54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e637c54

Branch: refs/heads/release-1.4
Commit: 1e637c54c2ad1b9a8d9ad6d3f9c8aa55605d7e8e
Parents: 6884ce9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 14:57:01 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:01:59 2018 -0800

----------------------------------------------------------------------
 .../api/functions/source/SourceFunction.java    | 48 +++++++++++++++++---
 1 file changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e637c54/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index cb2e15f..5a15df7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -41,15 +41,19 @@ import java.io.Serializable;
  * elements are not done concurrently. This is achieved by using the provided checkpointing lock
  * object to protect update of state and emission of elements in a synchronized block.
  *
- * <p>This is the basic pattern one should follow when implementing a (checkpointed) source:
+ * <p>This is the basic pattern one should follow when implementing a checkpointed source:
  *
  * <pre>{@code
- *  public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction {
+ *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
  *      private long count = 0L;
  *      private volatile boolean isRunning = true;
  *
+ *      private transient ListState<Long> checkpointedCount;
+ *
  *      public void run(SourceContext<T> ctx) {
  *          while (isRunning && count < 1000) {
+ *              // this synchronized block ensures that state checkpointing,
+ *              // internal state updates and emission of elements are an atomic operation
  *              synchronized (ctx.getCheckpointLock()) {
  *                  ctx.collect(count);
  *                  count++;
@@ -61,9 +65,22 @@ import java.io.Serializable;
  *          isRunning = false;
  *      }
  *
- *      public void snapshotState(FunctionSnapshotContext context) {  }
+ *      public void initializeState(FunctionInitializationContext context) {
+ *          this.checkpointedCount = context
+ *              .getOperatorStateStore()
+ *              .getListState(new ListStateDescriptor<>("count", Long.class));
+ *
+ *          if (context.isRestored()) {
+ *              for (Long count : this.checkpointedCount.get()) {
+ *                  this.count = count;
+ *              }
+ *          }
+ *      }
  *
- *      public void initializeState(FunctionInitializationContext context) {  }
+ *      public void snapshotState(FunctionSnapshotContext context) {
+ *          this.checkpointedCount.clear();
+ *          this.checkpointedCount.add(count);
+ *      }
  * }
  * }</pre>
  *
@@ -101,12 +118,16 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 * state and emitting elements, to make both an atomic operation:
 	 *
 	 * <pre>{@code
-	 *  public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction<Long> {
+	 *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
 	 *      private long count = 0L;
 	 *      private volatile boolean isRunning = true;
 	 *
+	 *      private transient ListState<Long> checkpointedCount;
+	 *
 	 *      public void run(SourceContext<T> ctx) {
 	 *          while (isRunning && count < 1000) {
+	 *              // this synchronized block ensures that state checkpointing,
+	 *              // internal state updates and emission of elements are an atomic operation
 	 *              synchronized (ctx.getCheckpointLock()) {
 	 *                  ctx.collect(count);
 	 *                  count++;
@@ -118,9 +139,22 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 *          isRunning = false;
 	 *      }
 	 *
-	 *      public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
+	 *      public void initializeState(FunctionInitializationContext context) {
+	 *          this.checkpointedCount = context
+	 *              .getOperatorStateStore()
+	 *              .getListState(new ListStateDescriptor<>("count", Long.class));
+	 *
+	 *          if (context.isRestored()) {
+	 *              for (Long count : this.checkpointedCount.get()) {
+	 *                  this.count = count;
+	 *              }
+	 *          }
+	 *      }
 	 *
-	 *      public void restoreState(Long state) { this.count = state; }
+	 *      public void snapshotState(FunctionSnapshotContext context) {
+	 *          this.checkpointedCount.clear();
+	 *          this.checkpointedCount.add(count);
+	 *      }
 	 * }
 	 * }</pre>
 	 *


[3/9] flink git commit: [FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs

Posted by tz...@apache.org.
[FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c454ee3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c454ee3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c454ee3f

Branch: refs/heads/release-1.4
Commit: c454ee3fa9a14cfe28dbfc641134659252d2c80b
Parents: 1e637c5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 15:21:31 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:02:09 2018 -0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md | 46 ++++++++++-----------------------------
 1 file changed, 11 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c454ee3f/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index daf1903..c6461f9 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -447,17 +447,17 @@ if a new watermark should be emitted and with which timestamp.
 
 ## Kafka Producer
 
-Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
+Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 0.10.0.x versions, etc.).
 It allows writing a stream of records to one or more Kafka topics.
 
 Example:
 
 <div class="codetabs" markdown="1">
-<div data-lang="java, Kafka 0.8+" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 DataStream<String> stream = ...;
 
-FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
         "localhost:9092",            // broker list
         "my-topic",                  // target topic
         new SimpleStringSchema());   // serialization schema
@@ -466,29 +466,17 @@ FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
 myProducer.setLogFailuresOnly(false);   // "false" by default
 myProducer.setFlushOnCheckpoint(true);  // "false" by default
 
-stream.addSink(myProducer);
-{% endhighlight %}
-</div>
-<div data-lang="java, Kafka 0.10+" markdown="1">
-{% highlight java %}
-DataStream<String> stream = ...;
-
-FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
-        stream,                     // input stream
-        "my-topic",                 // target topic
-        new SimpleStringSchema(),   // serialization schema
-        properties);                // custom configuration for KafkaProducer (including broker list)
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+myProducer.setWriteTimestampToKafka(true);
 
-// the following is necessary for at-least-once delivery guarantee
-myProducerConfig.setLogFailuresOnly(false);   // "false" by default
-myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default
+stream.addSink(myProducer);
 {% endhighlight %}
 </div>
-<div data-lang="scala, Kafka 0.8+" markdown="1">
+<div data-lang="scala" markdown="1">
 {% highlight scala %}
 val stream: DataStream[String] = ...
 
-val myProducer = new FlinkKafkaProducer08[String](
+val myProducer = new FlinkKafkaProducer011[String](
         "localhost:9092",         // broker list
         "my-topic",               // target topic
         new SimpleStringSchema)   // serialization schema
@@ -497,22 +485,10 @@ val myProducer = new FlinkKafkaProducer08[String](
 myProducer.setLogFailuresOnly(false)   // "false" by default
 myProducer.setFlushOnCheckpoint(true)  // "false" by default
 
-stream.addSink(myProducer)
-{% endhighlight %}
-</div>
-<div data-lang="scala, Kafka 0.10+" markdown="1">
-{% highlight scala %}
-val stream: DataStream[String] = ...
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+myProducer.setWriteTimestampToKafka(true)
 
-val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
-        stream,                   // input stream
-        "my-topic",               // target topic
-        new SimpleStringSchema,   // serialization schema
-        properties)               // custom configuration for KafkaProducer (including broker list)
-
-// the following is necessary for at-least-once delivery guarantee
-myProducerConfig.setLogFailuresOnly(false)   // "false" by default
-myProducerConfig.setFlushOnCheckpoint(true)  // "false" by default
+stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 </div>


[7/9] flink git commit: [hotfix] [kafka] Fix stale Javadoc link in FlinkKafkaProducer09

Posted by tz...@apache.org.
[hotfix] [kafka] Fix stale Javadoc link in FlinkKafkaProducer09

The previous link was referencing a non-existent constructor signature.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16a49de3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16a49de3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16a49de3

Branch: refs/heads/release-1.4
Commit: 16a49de3f66e29108f6fb8977a51a2652ed3f312
Parents: 24d4493
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:36:31 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:03:58 2018 -0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaProducer09.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16a49de3/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 19f445f..7f00c92 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -219,7 +219,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 *
 	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
 	 *             producing to multiple topics. Use
-	 *             {@link #FlinkKafkaProducer09(String, org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *             {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
 	 */
 	@Deprecated
 	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {


[9/9] flink git commit: [FLINK-8283] [kafka] Stabalize FlinkKafkaConsumerBaseTest::testScaleUp()

Posted by tz...@apache.org.
[FLINK-8283] [kafka] Stabalize FlinkKafkaConsumerBaseTest::testScaleUp()

Previously, the testScaleUp() test was taking too much resources and
causing test resources to be terminated before the test could finish.
This commit lowers the intensity of the test, while still retaining the
verified behaviour (i.e., when restoring the Kafka consumer with higher
parallelism and more Kafka partitions).

This closes #5201.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74135c9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74135c9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74135c9d

Branch: refs/heads/release-1.4
Commit: 74135c9db11728f2189b6b4ccae90b1d4ccb84c1
Parents: f462f77
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Dec 21 13:41:48 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:04:51 2018 -0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74135c9d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 168cfd5..6ccfeb1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -516,7 +516,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 	@Test
 	public void testScaleUp() throws Exception {
-		testRescaling(5, 2, 15, 1000);
+		testRescaling(5, 2, 8, 30);
 	}
 
 	@Test


[5/9] flink git commit: [FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010

Posted by tz...@apache.org.
[FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010

This commit moves deprecated factory methods of the
FlinkKafkaProducer010 behind regular constructors, for better navigation
and readability of the code.

This closes #5179.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10f1acf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10f1acf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10f1acf9

Branch: refs/heads/release-1.4
Commit: 10f1acf92313cde7bf4ac8aa1403b19405d2ed25
Parents: 7197489
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:29:38 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:03:40 2018 -0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 167 +++++++++----------
 1 file changed, 82 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10f1acf9/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 21e3a10..0e64aa5 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -46,80 +46,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 */
 	private boolean writeTimestampToKafka = false;
 
-	// ---------------------- "Constructors" for timestamp writing ------------------
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 * @param inStream The stream to write to Kafka
-	 * @param topicId ID of the Kafka topic.
-	 * @param serializationSchema User defined serialization schema supporting key/value messages
-	 * @param producerConfig Properties with the producer configuration.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					KeyedSerializationSchema<T> serializationSchema,
-																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 * @param inStream The stream to write to Kafka
-	 * @param topicId ID of the Kafka topic.
-	 * @param serializationSchema User defined (keyless) serialization schema.
-	 * @param producerConfig Properties with the producer configuration.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					SerializationSchema<T> serializationSchema,
-																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 *  @param inStream The stream to write to Kafka
-	 *  @param topicId The name of the target topic
-	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					KeyedSerializationSchema<T> serializationSchema,
-																					Properties producerConfig,
-																					FlinkKafkaPartitioner<T> customPartitioner) {
-
-		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
-		DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
-		return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
-
-	}
-
 	// ---------------------- Regular constructors------------------
 
 	/**
@@ -267,6 +193,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	// ------------------- User configuration ----------------------
+
+	/**
+	 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+	 * Timestamps must be positive for Kafka to accept them.
+	 *
+	 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+	 */
+	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+		this.writeTimestampToKafka = writeTimestampToKafka;
+	}
+
 	// ----------------------------- Deprecated constructors / factory methods  ---------------------------
 
 	/**
@@ -275,6 +213,76 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 *
 	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
 	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined serialization schema supporting key/value messages
+	 * @param producerConfig Properties with the producer configuration.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined (keyless) serialization schema.
+	 * @param producerConfig Properties with the producer configuration.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					SerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					FlinkKafkaPartitioner<T> customPartitioner) {
+		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+		DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
 	 *  @param inStream The stream to write to Kafka
 	 *  @param topicId The name of the target topic
 	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
@@ -332,17 +340,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
-	/**
-	 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
-	 * Timestamps must be positive for Kafka to accept them.
-	 *
-	 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
-	 */
-	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
-		this.writeTimestampToKafka = writeTimestampToKafka;
-	}
-
-
 	// ----------------------------- Generic element processing  ---------------------------
 
 	@Override