You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/06 05:51:54 UTC

[01/11] flink git commit: [FLINK-8116] [DataStream] Provide proper checkpointed source function example in Javadocs

Repository: flink
Updated Branches:
  refs/heads/master 9ae4c5447 -> 542419ba0


[FLINK-8116] [DataStream] Provide proper checkpointed source function example in Javadocs

This closes #5121.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b5fdbd6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b5fdbd6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b5fdbd6

Branch: refs/heads/master
Commit: 7b5fdbd6501c33f74700d79aedc9411d160191ba
Parents: ac3b721
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 14:57:01 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:13:48 2018 -0800

----------------------------------------------------------------------
 .../api/functions/source/SourceFunction.java    | 48 +++++++++++++++++---
 1 file changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b5fdbd6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index cb2e15f..5a15df7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -41,15 +41,19 @@ import java.io.Serializable;
  * elements are not done concurrently. This is achieved by using the provided checkpointing lock
  * object to protect update of state and emission of elements in a synchronized block.
  *
- * <p>This is the basic pattern one should follow when implementing a (checkpointed) source:
+ * <p>This is the basic pattern one should follow when implementing a checkpointed source:
  *
  * <pre>{@code
- *  public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction {
+ *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
  *      private long count = 0L;
  *      private volatile boolean isRunning = true;
  *
+ *      private transient ListState<Long> checkpointedCount;
+ *
  *      public void run(SourceContext<T> ctx) {
  *          while (isRunning && count < 1000) {
+ *              // this synchronized block ensures that state checkpointing,
+ *              // internal state updates and emission of elements are an atomic operation
  *              synchronized (ctx.getCheckpointLock()) {
  *                  ctx.collect(count);
  *                  count++;
@@ -61,9 +65,22 @@ import java.io.Serializable;
  *          isRunning = false;
  *      }
  *
- *      public void snapshotState(FunctionSnapshotContext context) {  }
+ *      public void initializeState(FunctionInitializationContext context) {
+ *          this.checkpointedCount = context
+ *              .getOperatorStateStore()
+ *              .getListState(new ListStateDescriptor<>("count", Long.class));
+ *
+ *          if (context.isRestored()) {
+ *              for (Long count : this.checkpointedCount.get()) {
+ *                  this.count = count;
+ *              }
+ *          }
+ *      }
  *
- *      public void initializeState(FunctionInitializationContext context) {  }
+ *      public void snapshotState(FunctionSnapshotContext context) {
+ *          this.checkpointedCount.clear();
+ *          this.checkpointedCount.add(count);
+ *      }
  * }
  * }</pre>
  *
@@ -101,12 +118,16 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 * state and emitting elements, to make both an atomic operation:
 	 *
 	 * <pre>{@code
-	 *  public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction<Long> {
+	 *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
 	 *      private long count = 0L;
 	 *      private volatile boolean isRunning = true;
 	 *
+	 *      private transient ListState<Long> checkpointedCount;
+	 *
 	 *      public void run(SourceContext<T> ctx) {
 	 *          while (isRunning && count < 1000) {
+	 *              // this synchronized block ensures that state checkpointing,
+	 *              // internal state updates and emission of elements are an atomic operation
 	 *              synchronized (ctx.getCheckpointLock()) {
 	 *                  ctx.collect(count);
 	 *                  count++;
@@ -118,9 +139,22 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 *          isRunning = false;
 	 *      }
 	 *
-	 *      public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
+	 *      public void initializeState(FunctionInitializationContext context) {
+	 *          this.checkpointedCount = context
+	 *              .getOperatorStateStore()
+	 *              .getListState(new ListStateDescriptor<>("count", Long.class));
+	 *
+	 *          if (context.isRestored()) {
+	 *              for (Long count : this.checkpointedCount.get()) {
+	 *                  this.count = count;
+	 *              }
+	 *          }
+	 *      }
 	 *
-	 *      public void restoreState(Long state) { this.count = state; }
+	 *      public void snapshotState(FunctionSnapshotContext context) {
+	 *          this.checkpointedCount.clear();
+	 *          this.checkpointedCount.add(count);
+	 *      }
 	 * }
 	 * }</pre>
 	 *


[03/11] flink git commit: [FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs

Posted by tz...@apache.org.
[FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b49ead38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b49ead38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b49ead38

Branch: refs/heads/master
Commit: b49ead387a30ff0d4f204e82fd8430012380eb77
Parents: 7b5fdbd
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 15:21:31 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:14:20 2018 -0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md | 46 ++++++++++-----------------------------
 1 file changed, 11 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b49ead38/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index daf1903..c6461f9 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -447,17 +447,17 @@ if a new watermark should be emitted and with which timestamp.
 
 ## Kafka Producer
 
-Flink’s Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.).
+Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 0.10.0.x versions, etc.).
 It allows writing a stream of records to one or more Kafka topics.
 
 Example:
 
 <div class="codetabs" markdown="1">
-<div data-lang="java, Kafka 0.8+" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 DataStream<String> stream = ...;
 
-FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
         "localhost:9092",            // broker list
         "my-topic",                  // target topic
         new SimpleStringSchema());   // serialization schema
@@ -466,29 +466,17 @@ FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
 myProducer.setLogFailuresOnly(false);   // "false" by default
 myProducer.setFlushOnCheckpoint(true);  // "false" by default
 
-stream.addSink(myProducer);
-{% endhighlight %}
-</div>
-<div data-lang="java, Kafka 0.10+" markdown="1">
-{% highlight java %}
-DataStream<String> stream = ...;
-
-FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
-        stream,                     // input stream
-        "my-topic",                 // target topic
-        new SimpleStringSchema(),   // serialization schema
-        properties);                // custom configuration for KafkaProducer (including broker list)
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+myProducer.setWriteTimestampToKafka(true);
 
-// the following is necessary for at-least-once delivery guarantee
-myProducerConfig.setLogFailuresOnly(false);   // "false" by default
-myProducerConfig.setFlushOnCheckpoint(true);  // "false" by default
+stream.addSink(myProducer);
 {% endhighlight %}
 </div>
-<div data-lang="scala, Kafka 0.8+" markdown="1">
+<div data-lang="scala" markdown="1">
 {% highlight scala %}
 val stream: DataStream[String] = ...
 
-val myProducer = new FlinkKafkaProducer08[String](
+val myProducer = new FlinkKafkaProducer011[String](
         "localhost:9092",         // broker list
         "my-topic",               // target topic
         new SimpleStringSchema)   // serialization schema
@@ -497,22 +485,10 @@ val myProducer = new FlinkKafkaProducer08[String](
 myProducer.setLogFailuresOnly(false)   // "false" by default
 myProducer.setFlushOnCheckpoint(true)  // "false" by default
 
-stream.addSink(myProducer)
-{% endhighlight %}
-</div>
-<div data-lang="scala, Kafka 0.10+" markdown="1">
-{% highlight scala %}
-val stream: DataStream[String] = ...
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+myProducer.setWriteTimestampToKafka(true)
 
-val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
-        stream,                   // input stream
-        "my-topic",               // target topic
-        new SimpleStringSchema,   // serialization schema
-        properties)               // custom configuration for KafkaProducer (including broker list)
-
-// the following is necessary for at-least-once delivery guarantee
-myProducerConfig.setLogFailuresOnly(false)   // "false" by default
-myProducerConfig.setFlushOnCheckpoint(true)  // "false" by default
+stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 </div>


[08/11] flink git commit: [hotfix] [kafka] Fix stale Javadoc link in FlinkKafkaProducer09

Posted by tz...@apache.org.
[hotfix] [kafka] Fix stale Javadoc link in FlinkKafkaProducer09

The previous link was referencing a non-existent constructor signature.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7a6df1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7a6df1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7a6df1a

Branch: refs/heads/master
Commit: f7a6df1a63880774132357e711abb30c0d831ee7
Parents: dfe3e62
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:36:31 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:19:25 2018 -0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaProducer09.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7a6df1a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 19f445f..7f00c92 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -219,7 +219,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 *
 	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
 	 *             producing to multiple topics. Use
-	 *             {@link #FlinkKafkaProducer09(String, org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *             {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
 	 */
 	@Deprecated
 	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {


[06/11] flink git commit: [hotfix] [kafka] Properly deprecate FlinkKafkaProducer010Configuration

Posted by tz...@apache.org.
[hotfix] [kafka] Properly deprecate FlinkKafkaProducer010Configuration

FlinkKafkaProducer010Configuration is the return type of the deprecated
writeToKafkaWithTimestamp factory methods. Therefore, the class should
also be deprecated as well.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dfe3e623
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dfe3e623
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dfe3e623

Branch: refs/heads/master
Commit: dfe3e623ea15471bc84c56256b3872df301372c0
Parents: 8d42197
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:34:17 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:19:24 2018 -0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaProducer010.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dfe3e623/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 0e64aa5..369ab89 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -386,7 +386,11 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 *
 	 * <p>To enable the settings, this fake sink must override all the public methods
 	 * in {@link DataStreamSink}.</p>
+	 *
+	 * @deprecated This class is deprecated since the factory methods {@code writeToKafkaWithTimestamps}
+	 *             for the producer are also deprecated.
 	 */
+	@Deprecated
 	public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
 
 		private final FlinkKafkaProducer010 producer;


[11/11] flink git commit: [FLINK-8283] [kafka] Stabalize FlinkKafkaConsumerBaseTest::testScaleUp()

Posted by tz...@apache.org.
[FLINK-8283] [kafka] Stabalize FlinkKafkaConsumerBaseTest::testScaleUp()

Previously, the testScaleUp() test was taking too much resources and
causing test resources to be terminated before the test could finish.
This commit lowers the intensity of the test, while still retaining the
verified behaviour (i.e., when restoring the Kafka consumer with higher
parallelism and more Kafka partitions).

This closes #5201.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/542419ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/542419ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/542419ba

Branch: refs/heads/master
Commit: 542419ba07b1c0b0ba68b636d14de8f1a00aaae1
Parents: 091a370
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Dec 21 13:41:48 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:34:17 2018 -0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/542419ba/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 168cfd5..6ccfeb1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -516,7 +516,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 	@Test
 	public void testScaleUp() throws Exception {
-		testRescaling(5, 2, 15, 1000);
+		testRescaling(5, 2, 8, 30);
 	}
 
 	@Test


[04/11] flink git commit: [FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour

Posted by tz...@apache.org.
[FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f68e790
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f68e790
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f68e790

Branch: refs/heads/master
Commit: 9f68e790fc28197f89638cc83d1612f8f7a796a8
Parents: b49ead3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:21:20 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:15:42 2018 -0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    | 46 +++++++----
 .../connectors/kafka/FlinkKafkaProducer010.java | 83 ++++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer011.java | 87 +++++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer08.java  | 78 ++++++++++++++++--
 .../connectors/kafka/FlinkKafkaProducer09.java  | 83 ++++++++++++++++---
 5 files changed, 322 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index c6461f9..e2df5fd 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -462,11 +462,8 @@ FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
         "my-topic",                  // target topic
         new SimpleStringSchema());   // serialization schema
 
-// the following is necessary for at-least-once delivery guarantee
-myProducer.setLogFailuresOnly(false);   // "false" by default
-myProducer.setFlushOnCheckpoint(true);  // "false" by default
-
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
+// this method is not available for earlier Kafka versions
 myProducer.setWriteTimestampToKafka(true);
 
 stream.addSink(myProducer);
@@ -481,11 +478,8 @@ val myProducer = new FlinkKafkaProducer011[String](
         "my-topic",               // target topic
         new SimpleStringSchema)   // serialization schema
 
-// the following is necessary for at-least-once delivery guarantee
-myProducer.setLogFailuresOnly(false)   // "false" by default
-myProducer.setFlushOnCheckpoint(true)  // "false" by default
-
-// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka
+// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
+// this method is not available for earlier Kafka versions
 myProducer.setWriteTimestampToKafka(true)
 
 stream.addSink(myProducer)
@@ -505,11 +499,30 @@ are other constructor variants that allow providing the following:
  partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
  constructor. This partitioner will be called for each record in the stream
  to determine which exact partition of the target topic the record should be sent to.
+ Please see [Kafka Producer Partitioning Scheme](#kafka-producer-partitioning-scheme) for more details.
  * *Advanced serialization schema*: Similar to the consumer,
  the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
  which allows serializing the key and value separately. It also allows to override the target topic,
  so that one producer instance can send data to multiple topics.
  
+### Kafka Producer Partitioning Scheme
+ 
+By default, if a custom partitioner is not specified for the Flink Kafka Producer, the producer will use
+a `FlinkFixedPartitioner` that maps each Flink Kafka Producer parallel subtask to a single Kafka partition
+(i.e., all records received by a sink subtask will end up in the same Kafka partition).
+
+A custom partitioner can be implemented by extending the `FlinkKafkaPartitioner` class. All
+Kafka versions' constructors allow providing a custom partitioner when instantiating the producer.
+Note that the partitioner implementation must be serializable, as they will be transferred across Flink nodes.
+Also, keep in mind that any state in the partitioner will be lost on job failures since the partitioner
+is not part of the producer's checkpointed state.
+
+It is also possible to completely avoid using and kind of partitioner, and simply let Kafka partition
+the written records by their attached key (as determined for each record using the provided serialization schema).
+To do this, provide a `null` custom partitioner when instantiating the producer. It is important
+to provide `null` as the custom partitioner; as explained above, if a custom partitioner is not specified
+the `FlinkFixedPartitioner` is used instead.
+
 ### Kafka Producers and Fault Tolerance
 
 #### Kafka 0.8
@@ -522,17 +535,22 @@ With Flink's checkpointing enabled, the `FlinkKafkaProducer09` and `FlinkKafkaPr
 can provide at-least-once delivery guarantees.
 
 Besides enabling Flink's checkpointing, you should also configure the setter
-methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately,
-as shown in the above examples in the previous section:
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately.
 
- * `setLogFailuresOnly(boolean)`: enabling this will let the producer log failures only
+ * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
+ Enabling this will let the producer only log failures
  instead of catching and rethrowing them. This essentially accounts the record
  to have succeeded, even if it was never written to the target Kafka topic. This
  must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: with this enabled, Flink's checkpoints will wait for any
+ * `setFlushOnCheckpoint(boolean)`: by default, this is set to `false`.
+ With this enabled, Flink's checkpoints will wait for any
  on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
  succeeding the checkpoint. This ensures that all records before the checkpoint have
  been written to Kafka. This must be enabled for at-least-once.
+ 
+In conclusion, to configure the Kafka producer to have at-least-once guarantees for versions
+0.9 and 0.10, `setLogFailureOnly` must be set to `false` and `setFlushOnCheckpoint` must be set
+to `true`.
 
 **Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly` is set to `false`,
 the producer fails immediately on errors, including leader changes. The value is set to "0" by default to avoid

http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 184a2e7..21e3a10 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -32,6 +32,8 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -124,12 +126,20 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
@@ -139,10 +149,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -151,15 +169,26 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+	public FlinkKafkaProducer010(
+			String topicId,
+			SerializationSchema<T> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<T> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 	}
 
@@ -169,6 +198,14 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -184,6 +221,14 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -196,11 +241,29 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	}
 
 	/**
-	 * Create Kafka producer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
 	 *
-	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+	public FlinkKafkaProducer010(
+			String topicId,
+			KeyedSerializationSchema<T> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<T> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index ccf11e7..d0f5039 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -277,13 +277,21 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, SerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -296,15 +304,26 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+	public FlinkKafkaProducer011(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 	}
 
@@ -314,6 +333,14 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -333,6 +360,14 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -352,12 +387,22 @@ public class FlinkKafkaProducer011<IN>
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, Properties, Optional, Semantic, int)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
 	 * 			User defined serialization schema supporting key/value messages
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
+	 * @param semantic
+	 * 			Defines semantic that will be used by this producer (see {@link Semantic}).
 	 */
 	public FlinkKafkaProducer011(
 			String topicId,
@@ -374,12 +419,22 @@ public class FlinkKafkaProducer011<IN>
 
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param defaultTopicId The default topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
 	public FlinkKafkaProducer011(
 			String defaultTopicId,
@@ -396,12 +451,22 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param defaultTopicId The default topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If a partitioner is not provided, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}).
 	 * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 20900f0..fa80252 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -38,17 +40,26 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 
 	private static final long serialVersionUID = 1L;
 
-	// ------------------- Keyless serialization schema constructors ----------------------
+	// ------------------- Key-less serialization schema constructors ----------------------
+
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
@@ -58,10 +69,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -70,14 +89,26 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer08(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
@@ -88,6 +119,14 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -103,6 +142,14 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -115,14 +162,29 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	}
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer08(
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f68e790/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 407bad5..19f445f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -25,6 +25,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -38,31 +40,47 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 
 	private static final long serialVersionUID = 1L;
 
-	// ------------------- Keyless serialization schema constructors ----------------------
+	// ------------------- Key-less serialization schema constructors ----------------------
 
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 */
 	public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined (keyless) serialization schema.
+	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
@@ -71,15 +89,26 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be distributed to Kafka partitions
+	 *                          in a round-robin fashion.
 	 */
-	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer09(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
@@ -90,6 +119,14 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param brokerList
 	 *			Comma separated addresses of the brokers
 	 * @param topicId
@@ -105,6 +142,14 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
+	 * <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
+	 * the partitioner. This default partitioner maps each sink subtask to a single Kafka
+	 * partition (i.e. all records received by a sink subtask will end up in the same
+	 * Kafka partition).
+	 *
+	 * <p>To use a custom partitioner, please use
+	 * {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 *
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
@@ -117,15 +162,29 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	}
 
 	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
+	 * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+	 * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+	 * will be distributed to Kafka partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *                          If set to {@code null}, records will be partitioned by the key of each record
+	 *                          (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
+	 *                          are {@code null}, then records will be distributed to Kafka partitions in a
+	 *                          round-robin fashion.
 	 */
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer09(
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 


[10/11] flink git commit: [FLINK-8298][tests] Properly shutdown MockEnvironment to release resources

Posted by tz...@apache.org.
[FLINK-8298][tests] Properly shutdown MockEnvironment to release resources

This closes #5193.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/091a3705
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/091a3705
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/091a3705

Branch: refs/heads/master
Commit: 091a37052b7045b3ed28c68bfea109024a5d1871
Parents: e8d1aa5
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Dec 20 12:27:56 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:30:12 2018 -0800

----------------------------------------------------------------------
 .../operators/testutils/MockEnvironment.java    |  19 +-
 .../operators/testutils/TaskTestBase.java       |  20 +-
 .../source/InputFormatSourceFunctionTest.java   |  37 ++--
 .../operators/StreamOperatorChainingTest.java   |  67 ++++---
 .../operators/windowing/WindowOperatorTest.java | 200 ++++++++++---------
 .../streaming/runtime/tasks/StreamTaskTest.java |  63 +++---
 .../util/AbstractStreamOperatorTestHarness.java |  25 ++-
 .../streaming/util/SourceFunctionUtil.java      |  27 ++-
 .../PojoSerializerUpgradeTest.java              |  73 +++----
 9 files changed, 296 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 861cf35..2fdddb5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -63,6 +63,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -70,7 +71,10 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class MockEnvironment implements Environment {
+/**
+ * IMPORTANT! Remember to close environment after usage!
+ */
+public class MockEnvironment implements Environment, AutoCloseable {
 	
 	private final TaskInfo taskInfo;
 	
@@ -376,4 +380,17 @@ public class MockEnvironment implements Environment {
 	public void failExternally(Throwable cause) {
 		throw new UnsupportedOperationException("MockEnvironment does not support external task failure.");
 	}
+
+	@Override
+	public void close() {
+		// close() method should be idempotent and calling memManager.verifyEmpty() will throw after it was shutdown.
+		if (!memManager.isShutdown()) {
+			checkState(memManager.verifyEmpty(), "Memory Manager managed memory was not completely freed.");
+		}
+
+		memManager.shutdown();
+		ioManager.shutdown();
+
+		checkState(ioManager.isProperlyShutDown(), "IO Manager has not properly shut down.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 53d75b3..8d198d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
@@ -33,12 +32,13 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
-import org.junit.Assert;
 
 import java.util.List;
 
@@ -145,19 +145,7 @@ public abstract class TaskTestBase extends TestLogger {
 	}
 
 	@After
-	public void shutdownIOManager() throws Exception {
-		this.mockEnv.getIOManager().shutdown();
-		Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
-	}
-
-	@After
-	public void shutdownMemoryManager() throws Exception {
-		if (this.memorySize > 0) {
-			MemoryManager memMan = getMemoryManager();
-			if (memMan != null) {
-				Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
-				memMan.shutdown();
-			}
-		}
+	public void shutdown() {
+		mockEnv.close();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index b99119e..82d11db 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -63,26 +64,29 @@ public class InputFormatSourceFunctionTest {
 
 		final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
 		final InputFormatSourceFunction<Integer> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class));
-		reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));
 
-		Assert.assertTrue(!format.isConfigured);
-		Assert.assertTrue(!format.isInputFormatOpen);
-		Assert.assertTrue(!format.isSplitOpen);
+		try (MockEnvironment environment = new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16)) {
+			reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits, environment));
 
-		reader.open(new Configuration());
-		Assert.assertTrue(format.isConfigured);
+			Assert.assertTrue(!format.isConfigured);
+			Assert.assertTrue(!format.isInputFormatOpen);
+			Assert.assertTrue(!format.isSplitOpen);
 
-		TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
-		reader.run(ctx);
+			reader.open(new Configuration());
+			Assert.assertTrue(format.isConfigured);
 
-		int splitsSeen = ctx.getSplitsSeen();
-		Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
+			TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
+			reader.run(ctx);
 
-		// we have exhausted the splits so the
-		// format and splits should be closed by now
+			int splitsSeen = ctx.getSplitsSeen();
+			Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
 
-		Assert.assertTrue(!format.isSplitOpen);
-		Assert.assertTrue(!format.isInputFormatOpen);
+			// we have exhausted the splits so the
+			// format and splits should be closed by now
+
+			Assert.assertTrue(!format.isSplitOpen);
+			Assert.assertTrue(!format.isInputFormatOpen);
+		}
 	}
 
 	private static class LifeCycleTestInputFormat extends RichInputFormat<Integer, InputSplit> {
@@ -255,10 +259,9 @@ public class InputFormatSourceFunctionTest {
 		private final LifeCycleTestInputFormat format;
 		private InputSplit[] inputSplits;
 
-		private MockRuntimeContext(LifeCycleTestInputFormat format, int noOfSplits) {
-
+		private MockRuntimeContext(LifeCycleTestInputFormat format, int noOfSplits, Environment environment) {
 			super(new MockStreamOperator(),
-				new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
+				environment,
 				Collections.<String, Accumulator<?, ?>>emptyMap());
 
 			this.noOfSplits = noOfSplits;

http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index 8d99acd..b237373 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -147,25 +147,30 @@ public class StreamOperatorChainingTest {
 		StreamMap<Integer, Integer> headOperator =
 				streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
 
-		StreamTask<Integer, StreamMap<Integer, Integer>> mockTask =
-				createMockTask(streamConfig, chainedVertex.getName());
+		try (MockEnvironment environment = createMockEnvironment(chainedVertex.getName())) {
+			StreamTask<Integer, StreamMap<Integer, Integer>> mockTask = createMockTask(streamConfig, environment);
 
-		OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask);
+			OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask);
 
-		headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());
+			headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());
 
-		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-			if (operator != null) {
-				operator.open();
+			for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+				if (operator != null) {
+					operator.open();
+				}
 			}
-		}
 
-		headOperator.processElement(new StreamRecord<>(1));
-		headOperator.processElement(new StreamRecord<>(2));
-		headOperator.processElement(new StreamRecord<>(3));
+			headOperator.processElement(new StreamRecord<>(1));
+			headOperator.processElement(new StreamRecord<>(2));
+			headOperator.processElement(new StreamRecord<>(3));
+
+			assertThat(sink1Results, contains("First: 1", "First: 2", "First: 3"));
+			assertThat(sink2Results, contains("Second: 1", "Second: 2", "Second: 3"));
+		}
+	}
 
-		assertThat(sink1Results, contains("First: 1", "First: 2", "First: 3"));
-		assertThat(sink2Results, contains("Second: 1", "Second: 2", "Second: 3"));
+	private MockEnvironment createMockEnvironment(String taskName) {
+		return new MockEnvironment(taskName, 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
 	}
 
 	@Test
@@ -287,38 +292,40 @@ public class StreamOperatorChainingTest {
 		StreamMap<Integer, Integer> headOperator =
 				streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
 
-		StreamTask<Integer, StreamMap<Integer, Integer>> mockTask =
-				createMockTask(streamConfig, chainedVertex.getName());
+		try (MockEnvironment environment = createMockEnvironment(chainedVertex.getName())) {
+			StreamTask<Integer, StreamMap<Integer, Integer>> mockTask = createMockTask(streamConfig, environment);
 
-		OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask);
+			OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask);
 
-		headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());
+			headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());
 
-		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-			if (operator != null) {
-				operator.open();
+			for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+				if (operator != null) {
+					operator.open();
+				}
 			}
-		}
 
-		headOperator.processElement(new StreamRecord<>(1));
-		headOperator.processElement(new StreamRecord<>(2));
-		headOperator.processElement(new StreamRecord<>(3));
+			headOperator.processElement(new StreamRecord<>(1));
+			headOperator.processElement(new StreamRecord<>(2));
+			headOperator.processElement(new StreamRecord<>(3));
 
-		assertThat(sink1Results, contains("First 1: 1"));
-		assertThat(sink2Results, contains("First 2: 1"));
-		assertThat(sink3Results, contains("Second: 2", "Second: 3"));
+			assertThat(sink1Results, contains("First 1: 1"));
+			assertThat(sink2Results, contains("First 2: 1"));
+			assertThat(sink3Results, contains("Second: 2", "Second: 3"));
+		}
 	}
 
-	private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(StreamConfig streamConfig, String taskName) {
+	private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(
+			StreamConfig streamConfig,
+			Environment environment) {
 		final Object checkpointLock = new Object();
-		final Environment env = new MockEnvironment(taskName, 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
 
 		@SuppressWarnings("unchecked")
 		StreamTask<IN, OT> mockTask = mock(StreamTask.class);
 		when(mockTask.getName()).thenReturn("Mock Task");
 		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
 		when(mockTask.getConfiguration()).thenReturn(streamConfig);
-		when(mockTask.getEnvironment()).thenReturn(env);
+		when(mockTask.getEnvironment()).thenReturn(environment);
 		when(mockTask.getExecutionConfig()).thenReturn(new ExecutionConfig().enableObjectReuse());
 
 		return mockTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 2fa1c3c..c03207e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -98,7 +99,13 @@ public class WindowOperatorTest extends TestLogger {
 	// late arriving event OutputTag<StreamRecord<IN>>
 	private static final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-output") {};
 
-	private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
+	private void testSlidingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator) throws Exception {
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			createTestHarness(operator);
+
+		testHarness.setup();
+		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -134,6 +141,9 @@ public class WindowOperatorTest extends TestLogger {
 		// do a snapshot, close and restore again
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
+
+		expectedOutput.clear();
+		testHarness = createTestHarness(operator);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
@@ -160,6 +170,8 @@ public class WindowOperatorTest extends TestLogger {
 		expectedOutput.add(new Watermark(7999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.close();
 	}
 
 	@Test
@@ -187,15 +199,7 @@ public class WindowOperatorTest extends TestLogger {
 				0,
 				null /* late data output tag */);
 
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testSlidingEventTimeWindows(testHarness);
-
-		testHarness.close();
+		testSlidingEventTimeWindows(operator);
 	}
 
 	@Test
@@ -222,20 +226,16 @@ public class WindowOperatorTest extends TestLogger {
 				0,
 				null /* late data output tag */);
 
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.open();
-
-		testSlidingEventTimeWindows(testHarness);
-
-		testHarness.close();
+		testSlidingEventTimeWindows(operator);
 
 		// we close once in the rest...
 		Assert.assertEquals("Close was not called.", 2, closeCalled.get());
 	}
 
-	private void testTumblingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
+	private void testTumblingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator) throws Exception {
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+			createTestHarness(operator);
+
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
@@ -262,7 +262,11 @@ public class WindowOperatorTest extends TestLogger {
 
 		// do a snapshot, close and restore again
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
 		testHarness.close();
+
+		testHarness = createTestHarness(operator);
+		expectedOutput.clear();
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
@@ -293,6 +297,8 @@ public class WindowOperatorTest extends TestLogger {
 		expectedOutput.add(new Watermark(7999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+		testHarness.close();
 	}
 
 	@Test
@@ -319,14 +325,7 @@ public class WindowOperatorTest extends TestLogger {
 				0,
 				null /* late data output tag */);
 
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.open();
-
-		testTumblingEventTimeWindows(testHarness);
-
-		testHarness.close();
+		testTumblingEventTimeWindows(operator);
 	}
 
 	@Test
@@ -352,14 +351,7 @@ public class WindowOperatorTest extends TestLogger {
 				0,
 				null /* late data output tag */);
 
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.open();
-
-		testTumblingEventTimeWindows(testHarness);
-
-		testHarness.close();
+		testTumblingEventTimeWindows(operator);
 
 		// we close once in the rest...
 		Assert.assertEquals("Close was not called.", 2, closeCalled.get());
@@ -389,7 +381,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -405,7 +397,12 @@ public class WindowOperatorTest extends TestLogger {
 
 		// do a snapshot, close and restore again
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
 		testHarness.close();
+
+		testHarness = createTestHarness(operator);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
@@ -462,7 +459,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -478,7 +475,12 @@ public class WindowOperatorTest extends TestLogger {
 
 		// do a snapshot, close and restore again
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
 		testHarness.close();
+
+		testHarness = createTestHarness(operator);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
@@ -535,7 +537,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -549,6 +551,8 @@ public class WindowOperatorTest extends TestLogger {
 		// do a snapshot, close and restore again
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
+
+		testHarness = createTestHarness(operator);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
@@ -606,7 +610,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -620,6 +624,8 @@ public class WindowOperatorTest extends TestLogger {
 		// do a snapshot, close and restore again
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
+
+		testHarness = createTestHarness(operator);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
@@ -681,7 +687,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -699,6 +705,12 @@ public class WindowOperatorTest extends TestLogger {
 		// do a snapshot, close and restore again
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.close();
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+		expectedOutput.clear();
+
+		testHarness = createTestHarness(operator);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
@@ -709,8 +721,6 @@ public class WindowOperatorTest extends TestLogger {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499));
-
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 
 		// add an element that merges the two "key1" sessions, they should now have count 6, and therefore fire
@@ -751,7 +761,7 @@ public class WindowOperatorTest extends TestLogger {
 			null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -776,7 +786,13 @@ public class WindowOperatorTest extends TestLogger {
 
 		// do a snapshot, close and restore again
 		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
 		testHarness.close();
+
+		expectedOutput.clear();
+		testHarness = createTestHarness(operator);
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
 		testHarness.open();
@@ -823,39 +839,45 @@ public class WindowOperatorTest extends TestLogger {
 				0,
 				null /* late data output tag */);
 
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		OperatorStateHandles snapshot;
 
-		testHarness.open();
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				createTestHarness(operator)) {
+			testHarness.open();
 
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 1000));
+			// add elements out-of-order
+			testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
+			testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 1000));
 
-		// do a snapshot, close and restore again
-		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.close();
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.open();
+			// do a snapshot, close and restore again
+			snapshot = testHarness.snapshot(0L, 0L);
+		}
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 2500));
+		try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+				createTestHarness(operator)) {
+			testHarness.setup();
+			testHarness.initializeState(snapshot);
+			testHarness.open();
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 33), 2500));
+			testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 2500));
 
-		testHarness.processWatermark(new Watermark(12000));
+			testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
+			testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
+			testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 33), 2500));
 
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-36", 10L, 4000L), 3999));
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-67", 0L, 3000L), 2999));
-		expectedOutput.add(new Watermark(12000));
+			testHarness.processWatermark(new Watermark(12000));
 
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+			expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-36", 10L, 4000L), 3999));
+			expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-67", 0L, 3000L), 2999));
+			expectedOutput.add(new Watermark(12000));
 
-		testHarness.close();
+			TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+		}
+	}
+
+	private static <OUT> OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, OUT> createTestHarness(OneInputStreamOperator<Tuple2<String, Integer>, OUT> operator) throws Exception {
+		return new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 	}
 
 	@Test
@@ -883,7 +905,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -969,7 +991,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1010,7 +1032,7 @@ public class WindowOperatorTest extends TestLogger {
 				0,
 				null /* late data output tag */);
 
-		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+		testHarness = createTestHarness(operator);
 
 		testHarness.setup();
 		testHarness.initializeState(snapshot);
@@ -1058,7 +1080,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1117,7 +1139,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1189,7 +1211,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+				createTestHarness(operator);
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
@@ -1255,7 +1277,7 @@ public class WindowOperatorTest extends TestLogger {
 				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1325,7 +1347,7 @@ public class WindowOperatorTest extends TestLogger {
 					null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1392,7 +1414,7 @@ public class WindowOperatorTest extends TestLogger {
 				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1457,7 +1479,7 @@ public class WindowOperatorTest extends TestLogger {
 				lateOutputTag /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1536,7 +1558,7 @@ public class WindowOperatorTest extends TestLogger {
 				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1628,7 +1650,7 @@ public class WindowOperatorTest extends TestLogger {
 				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1718,7 +1740,7 @@ public class WindowOperatorTest extends TestLogger {
 				lateOutputTag);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1805,7 +1827,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1906,7 +1928,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -1997,7 +2019,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -2089,7 +2111,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -2144,7 +2166,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -2191,7 +2213,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -2249,7 +2271,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -2295,7 +2317,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -2340,7 +2362,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 
@@ -2396,7 +2418,7 @@ public class WindowOperatorTest extends TestLogger {
 				null /* late data output tag */);
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
-			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+			createTestHarness(operator);
 
 		testHarness.open();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d7a26c3..d907df1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -754,43 +754,44 @@ public class StreamTaskTest extends TestLogger {
 		streamConfig.setStreamOperator(new BlockingCloseStreamOperator());
 		streamConfig.setOperatorID(new OperatorID());
 
-		MockEnvironment mockEnvironment = new MockEnvironment(
-			"Test Task",
-			32L * 1024L,
-			new MockInputSplitProvider(),
-			1,
-			taskConfiguration,
-			new ExecutionConfig());
-		StreamTask<Void, BlockingCloseStreamOperator> streamTask = new NoOpStreamTask<>(mockEnvironment);
-		final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
-
-		CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
-			() -> {
-				try {
-					streamTask.invoke();
-				} catch (Exception e) {
-					atomicThrowable.set(e);
-				}
-			},
-			TestingUtils.defaultExecutor());
+		try (MockEnvironment mockEnvironment = new MockEnvironment(
+				"Test Task",
+				32L * 1024L,
+				new MockInputSplitProvider(),
+				1,
+				taskConfiguration,
+				new ExecutionConfig())) {
+			StreamTask<Void, BlockingCloseStreamOperator> streamTask = new NoOpStreamTask<>(mockEnvironment);
+			final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
+
+			CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(
+				() -> {
+					try {
+						streamTask.invoke();
+					} catch (Exception e) {
+						atomicThrowable.set(e);
+					}
+				},
+				TestingUtils.defaultExecutor());
 
-		BlockingCloseStreamOperator.IN_CLOSE.await();
+			BlockingCloseStreamOperator.IN_CLOSE.await();
 
-		// check that the StreamTask is not yet in isRunning == false
-		assertTrue(streamTask.isRunning());
+			// check that the StreamTask is not yet in isRunning == false
+			assertTrue(streamTask.isRunning());
 
-		// let the operator finish its close operation
-		BlockingCloseStreamOperator.FINISH_CLOSE.trigger();
+			// let the operator finish its close operation
+			BlockingCloseStreamOperator.FINISH_CLOSE.trigger();
 
-		// wait until the invoke is complete
-		invokeFuture.get();
+			// wait until the invoke is complete
+			invokeFuture.get();
 
-		// now the StreamTask should no longer be running
-		assertFalse(streamTask.isRunning());
+			// now the StreamTask should no longer be running
+			assertFalse(streamTask.isRunning());
 
-		// check if an exception occurred
-		if (atomicThrowable.get() != null) {
-			throw atomicThrowable.get();
+			// check if an exception occurred
+			if (atomicThrowable.get() != null) {
+				throw atomicThrowable.get();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 13f0b3f..6650118 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -71,6 +71,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.mockito.Matchers.any;
@@ -99,6 +100,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 
 	final Environment environment;
 
+	private final Optional<MockEnvironment> internalEnvironment;
+
 	CloseableRegistry closableRegistry;
 
 	// use this as default for tests
@@ -118,11 +121,10 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 	private volatile boolean wasFailedExternally = false;
 
 	public AbstractStreamOperatorTestHarness(
-		StreamOperator<OUT> operator,
-		int maxParallelism,
-		int parallelism,
-		int subtaskIndex) throws Exception {
-
+			StreamOperator<OUT> operator,
+			int maxParallelism,
+			int parallelism,
+			int subtaskIndex) throws Exception {
 		this(
 			operator,
 			new MockEnvironment(
@@ -134,12 +136,20 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 				new ExecutionConfig(),
 				maxParallelism,
 				parallelism,
-				subtaskIndex));
+				subtaskIndex),
+			true);
 	}
 
 	public AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
 			final Environment environment) throws Exception {
+		this(operator, environment, false);
+	}
+
+	private AbstractStreamOperatorTestHarness(
+			StreamOperator<OUT> operator,
+			final Environment environment,
+			boolean environmentIsInternal) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
 		this.sideOutputLists = new HashMap<>();
@@ -153,6 +163,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		this.checkpointLock = new Object();
 
 		this.environment = Preconditions.checkNotNull(environment);
+		this.internalEnvironment = environmentIsInternal ? Optional.of((MockEnvironment) environment) : Optional.empty();
 
 		mockTask = mock(StreamTask.class);
 		processingTimeService = new TestProcessingTimeService();
@@ -492,6 +503,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 			processingTimeService.shutdownService();
 		}
 		setupCalled = false;
+
+		internalEnvironment.ifPresent(MockEnvironment::close);
 	}
 
 	public void setProcessingTime(long time) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 5f17467..312891e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
@@ -42,22 +41,32 @@ import static org.mockito.Mockito.when;
 public class SourceFunctionUtil {
 
 	public static <T extends Serializable> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
-		final List<T> outputs = new ArrayList<T>();
-
 		if (sourceFunction instanceof RichFunction) {
+			return runRichSourceFunction(sourceFunction);
+		}
+		else {
+			return runNonRichSourceFunction(sourceFunction);
+		}
+	}
 
+	private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
+		try (MockEnvironment environment = new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024)) {
 			AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
 			when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
 
-			RuntimeContext runtimeContext =  new StreamingRuntimeContext(
-					operator,
-					new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-					new HashMap<String, Accumulator<?, ?>>());
-
+			RuntimeContext runtimeContext = new StreamingRuntimeContext(
+				operator,
+				environment,
+				new HashMap<>());
 			((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
-
 			((RichFunction) sourceFunction).open(new Configuration());
+
+			return runNonRichSourceFunction(sourceFunction);
 		}
+	}
+
+	private static <T extends Serializable> List<T> runNonRichSourceFunction(SourceFunction<T> sourceFunction) {
+		final List<T> outputs = new ArrayList<>();
 		try {
 			SourceFunction.SourceContext<T> ctx = new CollectingSourceContext<T>(new Object(), outputs);
 			sourceFunction.run(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/091a3705/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index 54fe879..99a9d1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -350,50 +350,51 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			OperatorStateHandles operatorStateHandles,
 			Iterable<Long> input) throws Exception {
 
-		final MockEnvironment environment = new MockEnvironment(
-			"test task",
-			32 * 1024,
-			new MockInputSplitProvider(),
-			256,
-			taskConfiguration,
-			executionConfig,
-			16,
-			1,
-			0,
-			classLoader);
-
-		OneInputStreamOperatorTestHarness<Long, Long> harness;
-
-		if (isKeyedState) {
-			harness = new KeyedOneInputStreamOperatorTestHarness<>(
-				operator,
-				keySelector,
-				BasicTypeInfo.LONG_TYPE_INFO,
-				environment);
-		} else {
-			harness = new OneInputStreamOperatorTestHarness<>(operator, LongSerializer.INSTANCE, environment);
-		}
+		try (final MockEnvironment environment = new MockEnvironment(
+				"test task",
+				32 * 1024,
+				new MockInputSplitProvider(),
+				256,
+				taskConfiguration,
+				executionConfig,
+				16,
+				1,
+				0,
+				classLoader)) {
+
+			OneInputStreamOperatorTestHarness<Long, Long> harness;
+
+			if (isKeyedState) {
+				harness = new KeyedOneInputStreamOperatorTestHarness<>(
+					operator,
+					keySelector,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					environment);
+			} else {
+				harness = new OneInputStreamOperatorTestHarness<>(operator, LongSerializer.INSTANCE, environment);
+			}
 
-		harness.setStateBackend(stateBackend);
+			harness.setStateBackend(stateBackend);
 
-		harness.setup();
-		harness.initializeState(operatorStateHandles);
-		harness.open();
+			harness.setup();
+			harness.initializeState(operatorStateHandles);
+			harness.open();
 
-		long timestamp = 0L;
+			long timestamp = 0L;
 
-		for (Long value : input) {
-			harness.processElement(value, timestamp++);
-		}
+			for (Long value : input) {
+				harness.processElement(value, timestamp++);
+			}
 
-		long checkpointId = 1L;
-		long checkpointTimestamp = timestamp + 1L;
+			long checkpointId = 1L;
+			long checkpointTimestamp = timestamp + 1L;
 
-		OperatorStateHandles stateHandles = harness.snapshot(checkpointId, checkpointTimestamp);
+			OperatorStateHandles stateHandles = harness.snapshot(checkpointId, checkpointTimestamp);
 
-		harness.close();
+			harness.close();
 
-		return stateHandles;
+			return stateHandles;
+		}
 	}
 
 	private static File writeSourceFile(File root, String name, String source) throws IOException {


[09/11] flink git commit: [FLINK-8268][streaming][tests] Improve TwoPhaseCommitSinkFunctionTest stability by using custom in memory storage

Posted by tz...@apache.org.
[FLINK-8268][streaming][tests] Improve TwoPhaseCommitSinkFunctionTest stability by using custom in memory storage


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8d1aa57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8d1aa57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8d1aa57

Branch: refs/heads/master
Commit: e8d1aa57a8246de0b78e799a02c08f4007fb3a92
Parents: 102537d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Dec 20 11:23:26 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:30:06 2018 -0800

----------------------------------------------------------------------
 .../sink/TwoPhaseCommitSinkFunctionTest.java    | 128 +++++++-----------
 .../flink/streaming/util/ContentDump.java       | 132 +++++++++++++++++++
 2 files changed, 179 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e8d1aa57/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 63147a0..3123675 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.ContentDump;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.apache.log4j.AppenderSkeleton;
@@ -31,17 +32,9 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -54,12 +47,10 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -67,18 +58,15 @@ import static org.junit.Assert.fail;
  */
 public class TwoPhaseCommitSinkFunctionTest {
 
-	@Rule
-	public TemporaryFolder folder = new TemporaryFolder();
-
-	private FileBasedSinkFunction sinkFunction;
+	private ContentDumpSinkFunction sinkFunction;
 
 	private OneInputStreamOperatorTestHarness<String, Object> harness;
 
 	private AtomicBoolean throwException = new AtomicBoolean();
 
-	private File targetDirectory;
+	private ContentDump targetDirectory;
 
-	private File tmpDirectory;
+	private ContentDump tmpDirectory;
 
 	private SettableClock clock;
 
@@ -93,13 +81,22 @@ public class TwoPhaseCommitSinkFunctionTest {
 		loggingEvents = new ArrayList<>();
 		setupLogger();
 
-		targetDirectory = folder.newFolder("_target");
-		tmpDirectory = folder.newFolder("_tmp");
+		targetDirectory = new ContentDump();
+		tmpDirectory = new ContentDump();
 		clock = new SettableClock();
 
 		setUpTestHarness();
 	}
 
+	@After
+	public void tearDown() throws Exception {
+		closeTestHarness();
+		if (logger != null) {
+			logger.removeAppender(testAppender);
+		}
+		loggingEvents = null;
+	}
+
 	/**
 	 * Setup {@link org.apache.log4j.Logger}, the default logger implementation for tests,
 	 * to append {@link LoggingEvent}s to {@link #loggingEvents} so that we can assert if
@@ -131,17 +128,8 @@ public class TwoPhaseCommitSinkFunctionTest {
 		logger.setLevel(Level.WARN);
 	}
 
-	@After
-	public void tearDown() throws Exception {
-		closeTestHarness();
-		if (logger != null) {
-			logger.removeAppender(testAppender);
-		}
-		loggingEvents = null;
-	}
-
 	private void setUpTestHarness() throws Exception {
-		sinkFunction = new FileBasedSinkFunction();
+		sinkFunction = new ContentDumpSinkFunction();
 		harness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
 		harness.setup();
 	}
@@ -162,7 +150,7 @@ public class TwoPhaseCommitSinkFunctionTest {
 		harness.notifyOfCompletedCheckpoint(1);
 
 		assertExactlyOnce(Arrays.asList("42", "43"));
-		assertEquals(2, tmpDirectory.listFiles().length); // one for checkpointId 2 and second for the currentTransaction
+		assertEquals(2, tmpDirectory.listFiles().size()); // one for checkpointId 2 and second for the currentTransaction
 	}
 
 	@Test
@@ -173,21 +161,20 @@ public class TwoPhaseCommitSinkFunctionTest {
 		harness.processElement("43", 2);
 		OperatorStateHandles snapshot = harness.snapshot(1, 3);
 
-		assertTrue(tmpDirectory.setWritable(false));
+		tmpDirectory.setWritable(false);
 		try {
 			harness.processElement("44", 4);
 			harness.snapshot(2, 5);
 			fail("something should fail");
-		}
-		catch (Exception ex) {
-			if (!(ex.getCause() instanceof FileNotFoundException)) {
+		} catch (Exception ex) {
+			if (!(ex.getCause() instanceof ContentDump.NotWritableException)) {
 				throw ex;
 			}
 			// ignore
 		}
 		closeTestHarness();
 
-		assertTrue(tmpDirectory.setWritable(true));
+		tmpDirectory.setWritable(true);
 
 		setUpTestHarness();
 		harness.initializeState(snapshot);
@@ -195,7 +182,7 @@ public class TwoPhaseCommitSinkFunctionTest {
 		assertExactlyOnce(Arrays.asList("42", "43"));
 		closeTestHarness();
 
-		assertEquals(0, tmpDirectory.listFiles().length);
+		assertEquals(0, tmpDirectory.listFiles().size());
 	}
 
 	@Test
@@ -277,88 +264,68 @@ public class TwoPhaseCommitSinkFunctionTest {
 
 	private void assertExactlyOnce(List<String> expectedValues) throws IOException {
 		ArrayList<String> actualValues = new ArrayList<>();
-		for (File file : targetDirectory.listFiles()) {
-			actualValues.addAll(Files.readAllLines(file.toPath(), Charset.defaultCharset()));
+		for (String name : targetDirectory.listFiles()) {
+			actualValues.addAll(targetDirectory.read(name));
 		}
 		Collections.sort(actualValues);
 		Collections.sort(expectedValues);
 		assertEquals(expectedValues, actualValues);
 	}
 
-	private class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction, Void> {
+	private class ContentDumpSinkFunction extends TwoPhaseCommitSinkFunction<String, ContentTransaction, Void> {
 
-		public FileBasedSinkFunction() {
+		public ContentDumpSinkFunction() {
 			super(
-				new KryoSerializer<>(FileTransaction.class, new ExecutionConfig()),
+				new KryoSerializer<>(ContentTransaction.class, new ExecutionConfig()),
 				VoidSerializer.INSTANCE, clock);
-
-			if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
-				throw new IllegalArgumentException();
-			}
 		}
 
 		@Override
-		protected void invoke(FileTransaction transaction, String value, Context context) throws Exception {
-			transaction.writer.write(value);
+		protected void invoke(ContentTransaction transaction, String value, Context context) throws Exception {
+			transaction.tmpContentWriter.write(value);
 		}
 
 		@Override
-		protected FileTransaction beginTransaction() throws Exception {
-			File tmpFile = new File(tmpDirectory, UUID.randomUUID().toString());
-			return new FileTransaction(tmpFile);
+		protected ContentTransaction beginTransaction() throws Exception {
+			return new ContentTransaction(tmpDirectory.createWriter(UUID.randomUUID().toString()));
 		}
 
 		@Override
-		protected void preCommit(FileTransaction transaction) throws Exception {
-			transaction.writer.flush();
-			transaction.writer.close();
+		protected void preCommit(ContentTransaction transaction) throws Exception {
+			transaction.tmpContentWriter.flush();
+			transaction.tmpContentWriter.close();
 		}
 
 		@Override
-		protected void commit(FileTransaction transaction) {
+		protected void commit(ContentTransaction transaction) {
 			if (throwException.get()) {
 				throw new RuntimeException("Expected exception");
 			}
 
-			try {
-				Files.move(
-					transaction.tmpFile.toPath(),
-					new File(targetDirectory, transaction.tmpFile.getName()).toPath(),
-					ATOMIC_MOVE);
-			} catch (IOException e) {
-				throw new IllegalStateException(e);
-			}
+			ContentDump.move(
+				transaction.tmpContentWriter.getName(),
+				tmpDirectory,
+				targetDirectory);
 
 		}
 
 		@Override
-		protected void abort(FileTransaction transaction) {
-			try {
-				transaction.writer.close();
-			} catch (IOException e) {
-				// ignore
-			}
-			transaction.tmpFile.delete();
-		}
-
-		@Override
-		protected void recoverAndAbort(FileTransaction transaction) {
-			transaction.tmpFile.delete();
+		protected void abort(ContentTransaction transaction) {
+			transaction.tmpContentWriter.close();
+			tmpDirectory.delete(transaction.tmpContentWriter.getName());
 		}
 	}
 
-	private static class FileTransaction {
-		private final File tmpFile;
-		private final transient BufferedWriter writer;
+	private static class ContentTransaction {
+		private ContentDump.ContentWriter tmpContentWriter;
 
-		public FileTransaction(File tmpFile) throws IOException {
-			this.tmpFile = tmpFile;
-			this.writer = new BufferedWriter(new FileWriter(tmpFile));
+		public ContentTransaction(ContentDump.ContentWriter tmpContentWriter) {
+			this.tmpContentWriter = tmpContentWriter;
 		}
 
 		@Override
 		public String toString() {
-			return String.format("FileTransaction[%s]", tmpFile.getName());
+			return String.format("ContentTransaction[%s]", tmpContentWriter.getName());
 		}
 	}
 
@@ -399,5 +366,4 @@ public class TwoPhaseCommitSinkFunctionTest {
 			return Instant.ofEpochMilli(epochMilli);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e8d1aa57/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
new file mode 100644
index 0000000..903b237
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Utility class to simulate in memory file like writes, flushes and closing.
+ */
+public class ContentDump {
+	private boolean writable = true;
+	private Map<String, List<String>> filesContent = new HashMap<>();
+
+	public Set<String> listFiles() {
+		return new HashSet<>(filesContent.keySet());
+	}
+
+	public void setWritable(boolean writable) {
+		this.writable = writable;
+	}
+
+	/**
+	 * Creates an empty file.
+	 */
+	public ContentWriter createWriter(String name) {
+		checkArgument(!filesContent.containsKey(name), "File [%s] already exists", name);
+		filesContent.put(name, new ArrayList<>());
+		return new ContentWriter(name, this);
+	}
+
+	public static void move(String name, ContentDump source, ContentDump target) {
+		Collection<String> content = source.read(name);
+		try (ContentWriter contentWriter = target.createWriter(name)) {
+			contentWriter.write(content).flush();
+		}
+		source.delete(name);
+	}
+
+	public void delete(String name) {
+		filesContent.remove(name);
+	}
+
+	public Collection<String> read(String name) {
+		List<String> content = filesContent.get(name);
+		checkState(content != null, "Unknown file [%s]", name);
+		List<String> result = new ArrayList<>(content);
+		return result;
+	}
+
+	private void putContent(String name, List<String> values) {
+		List<String> content = filesContent.get(name);
+		checkState(content != null, "Unknown file [%s]", name);
+		if (!writable) {
+			throw new NotWritableException(name);
+		}
+		content.addAll(values);
+	}
+
+	/**
+	 * {@link ContentWriter} represents an abstraction that allows to putContent to the {@link ContentDump}.
+	 */
+	public static class ContentWriter implements AutoCloseable {
+		private final ContentDump contentDump;
+		private final String name;
+		private final List<String> buffer = new ArrayList<>();
+		private boolean closed = false;
+
+		private ContentWriter(String name, ContentDump contentDump) {
+			this.name = checkNotNull(name);
+			this.contentDump = checkNotNull(contentDump);
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public ContentWriter write(String value) {
+			checkState(!closed);
+			buffer.add(value);
+			return this;
+		}
+
+		public ContentWriter write(Collection<String> values) {
+			values.forEach(this::write);
+			return this;
+		}
+
+		public ContentWriter flush() {
+			contentDump.putContent(name, buffer);
+			return this;
+		}
+
+		public void close() {
+			buffer.clear();
+			closed = true;
+		}
+	}
+
+	/**
+	 * Exception thrown for an attempt to write into read-only {@link ContentDump}.
+	 */
+	public class NotWritableException extends RuntimeException {
+		public NotWritableException(String name) {
+			super(String.format("File [%s] is not writable", name));
+		}
+	}
+}


[07/11] flink git commit: [hotfix] [kafka] Add serialVersionUID to FlinkKafkaProducer010

Posted by tz...@apache.org.
[hotfix] [kafka] Add serialVersionUID to FlinkKafkaProducer010


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/102537df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/102537df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/102537df

Branch: refs/heads/master
Commit: 102537df77b12b6541a738b95571d95b5303110b
Parents: f7a6df1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:38:32 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:19:25 2018 -0800

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaProducer010.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/102537df/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 369ab89..e721340 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -41,6 +41,8 @@ import java.util.Properties;
  */
 public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 
+	private static final long serialVersionUID = 1L;
+
 	/**
 	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
 	 */


[02/11] flink git commit: [FLINK-8116] [DataStream] Fix stale comments referring to Checkpointed interface

Posted by tz...@apache.org.
[FLINK-8116] [DataStream] Fix stale comments referring to Checkpointed interface


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac3b721b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac3b721b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac3b721b

Branch: refs/heads/master
Commit: ac3b721bf65450ab4ecd90c05d9bb3946d2e447f
Parents: 9ae4c54
Author: Ankit Parashar <an...@gmail.com>
Authored: Mon Dec 4 23:46:16 2017 +0530
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:13:48 2018 -0800

----------------------------------------------------------------------
 docs/ops/state/state_backends.md                      |  2 +-
 .../api/functions/source/SourceFunction.java          | 14 +++++++-------
 .../api/scala/StreamExecutionEnvironment.scala        |  2 +-
 3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac3b721b/docs/ops/state/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 4cda94f..00e3c4f 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -26,7 +26,7 @@ Programs written in the [Data Stream API]({{ site.baseurl }}/dev/datastream_api.
 
 - Windows gather elements or aggregates until they are triggered
 - Transformation functions may use the key/value state interface to store values
-- Transformation functions may implement the `Checkpointed` interface to make their local variables fault tolerant
+- Transformation functions may implement the `CheckpointedFunction` interface to make their local variables fault tolerant
 
 See also [state section]({{ site.baseurl }}/dev/stream/state/index.html) in the streaming API guide.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac3b721b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 4665cc6..cb2e15f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -34,9 +34,9 @@ import java.io.Serializable;
  * The run method can run for as long as necessary. The source must, however, react to an
  * invocation of {@link #cancel()} by breaking out of its main loop.
  *
- * <h3>Checkpointed Sources</h3>
+ * <h3>CheckpointedFunction Sources</h3>
  *
- * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
+ * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
  * interface must ensure that state checkpointing, updating of internal state and emission of
  * elements are not done concurrently. This is achieved by using the provided checkpointing lock
  * object to protect update of state and emission of elements in a synchronized block.
@@ -44,7 +44,7 @@ import java.io.Serializable;
  * <p>This is the basic pattern one should follow when implementing a (checkpointed) source:
  *
  * <pre>{@code
- *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
+ *  public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction {
  *      private long count = 0L;
  *      private volatile boolean isRunning = true;
  *
@@ -61,9 +61,9 @@ import java.io.Serializable;
  *          isRunning = false;
  *      }
  *
- *      public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
+ *      public void snapshotState(FunctionSnapshotContext context) {  }
  *
- *      public void restoreState(Long state) { this.count = state; }
+ *      public void initializeState(FunctionInitializationContext context) {  }
  * }
  * }</pre>
  *
@@ -96,12 +96,12 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 * Starts the source. Implementations can use the {@link SourceContext} emit
 	 * elements.
 	 *
-	 * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
+	 * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
 	 * must lock on the checkpoint lock (using a synchronized block) before updating internal
 	 * state and emitting elements, to make both an atomic operation:
 	 *
 	 * <pre>{@code
-	 *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
+	 *  public class ExampleSource<T> implements SourceFunction<T>, CheckpointedFunction<Long> {
 	 *      private long count = 0L;
 	 *      private volatile boolean isRunning = true;
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/ac3b721b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 9fd03c3..3bba505 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -229,7 +229,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * [[KeyedStream]] is maintained (heap, managed memory, externally), and where state
    * snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
    * functions (implementing the interface 
-   * [[org.apache.flink.streaming.api.checkpoint.Checkpointed]].
+   * [[org.apache.flink.streaming.api.checkpoint.CheckpointedFunction]].
    *
    * <p>The [[org.apache.flink.runtime.state.memory.MemoryStateBackend]] for example
    * maintains the state in heap memory, as objects. It is lightweight without extra 


[05/11] flink git commit: [FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010

Posted by tz...@apache.org.
[FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010

This commit moves deprecated factory methods of the
FlinkKafkaProducer010 behind regular constructors, for better navigation
and readability of the code.

This closes #5179.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d42197b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d42197b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d42197b

Branch: refs/heads/master
Commit: 8d42197b662efaf58d92a3073d8c319f8a2a793e
Parents: 9f68e79
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:29:38 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 20:19:02 2018 -0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 167 +++++++++----------
 1 file changed, 82 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d42197b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 21e3a10..0e64aa5 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -46,80 +46,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 */
 	private boolean writeTimestampToKafka = false;
 
-	// ---------------------- "Constructors" for timestamp writing ------------------
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 * @param inStream The stream to write to Kafka
-	 * @param topicId ID of the Kafka topic.
-	 * @param serializationSchema User defined serialization schema supporting key/value messages
-	 * @param producerConfig Properties with the producer configuration.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					KeyedSerializationSchema<T> serializationSchema,
-																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 * @param inStream The stream to write to Kafka
-	 * @param topicId ID of the Kafka topic.
-	 * @param serializationSchema User defined (keyless) serialization schema.
-	 * @param producerConfig Properties with the producer configuration.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					SerializationSchema<T> serializationSchema,
-																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 *  @param inStream The stream to write to Kafka
-	 *  @param topicId The name of the target topic
-	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					KeyedSerializationSchema<T> serializationSchema,
-																					Properties producerConfig,
-																					FlinkKafkaPartitioner<T> customPartitioner) {
-
-		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
-		DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
-		return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
-
-	}
-
 	// ---------------------- Regular constructors------------------
 
 	/**
@@ -267,6 +193,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	// ------------------- User configuration ----------------------
+
+	/**
+	 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+	 * Timestamps must be positive for Kafka to accept them.
+	 *
+	 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+	 */
+	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+		this.writeTimestampToKafka = writeTimestampToKafka;
+	}
+
 	// ----------------------------- Deprecated constructors / factory methods  ---------------------------
 
 	/**
@@ -275,6 +213,76 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 *
 	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
 	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined serialization schema supporting key/value messages
+	 * @param producerConfig Properties with the producer configuration.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined (keyless) serialization schema.
+	 * @param producerConfig Properties with the producer configuration.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					SerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					FlinkKafkaPartitioner<T> customPartitioner) {
+		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+		DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
 	 *  @param inStream The stream to write to Kafka
 	 *  @param topicId The name of the target topic
 	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
@@ -332,17 +340,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
-	/**
-	 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
-	 * Timestamps must be positive for Kafka to accept them.
-	 *
-	 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
-	 */
-	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
-		this.writeTimestampToKafka = writeTimestampToKafka;
-	}
-
-
 	// ----------------------------- Generic element processing  ---------------------------
 
 	@Override