You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/06 04:25:59 UTC

[1/6] flink git commit: [hotfix] [docs] Added missing "more" word to programming-model docs.

Repository: flink
Updated Branches:
  refs/heads/release-1.3 5f4296ee8 -> 530a4d1f6


[hotfix] [docs] Added missing "more" word to programming-model docs.

This closes #4244.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7fb4ca6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7fb4ca6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7fb4ca6

Branch: refs/heads/release-1.3
Commit: b7fb4ca6af84cca91897bbea19e3e5cc4c153c15
Parents: 5f4296e
Author: adebski <an...@gmail.com>
Authored: Sun Jul 2 17:28:41 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:37 2017 +0800

----------------------------------------------------------------------
 docs/concepts/programming-model.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7fb4ca6/docs/concepts/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/concepts/programming-model.md b/docs/concepts/programming-model.md
index d83cf00..7b0cfb5 100644
--- a/docs/concepts/programming-model.md
+++ b/docs/concepts/programming-model.md
@@ -89,7 +89,7 @@ Transformations are documented in [DataStream transformations](../dev/datastream
 ## Parallel Dataflows
 
 Programs in Flink are inherently parallel and distributed. During execution, a *stream* has one or more **stream partitions**,
-and each *operator* has one or **operator subtasks**. The operator subtasks are independent of one another, and execute in different threads
+and each *operator* has one or more **operator subtasks**. The operator subtasks are independent of one another, and execute in different threads
 and possibly on different machines or containers.
 
 The number of operator subtasks is the **parallelism** of that particular operator. The parallelism of a stream


[2/6] flink git commit: [FLINK-7038] [docs] Correct misused term (KeyedDataStream -> KeyedStream)

Posted by tz...@apache.org.
[FLINK-7038] [docs] Correct misused term (KeyedDataStream -> KeyedStream)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ca13f2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ca13f2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ca13f2f

Branch: refs/heads/release-1.3
Commit: 7ca13f2f8d5163c77a77f2d40048ff1f27b1ea11
Parents: b7fb4ca
Author: zhangminglei <zm...@163.com>
Authored: Sat Jul 1 17:02:03 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:44 2017 +0800

----------------------------------------------------------------------
 docs/dev/datastream_api.md                                       | 4 ++--
 docs/dev/scala_api_extensions.md                                 | 4 ++--
 .../java/org/apache/flink/api/common/state/AggregatingState.java | 2 +-
 .../java/org/apache/flink/api/common/state/AppendingState.java   | 2 +-
 .../java/org/apache/flink/api/common/state/FoldingState.java     | 2 +-
 .../main/java/org/apache/flink/api/common/state/ListState.java   | 2 +-
 .../main/java/org/apache/flink/api/common/state/MapState.java    | 2 +-
 .../java/org/apache/flink/api/common/state/ReducingState.java    | 2 +-
 .../src/main/java/org/apache/flink/api/common/state/State.java   | 2 +-
 .../main/java/org/apache/flink/api/common/state/ValueState.java  | 2 +-
 .../org/apache/flink/streaming/api/datastream/KeyedStream.java   | 2 +-
 .../impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala      | 2 +-
 12 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 728c945..7191d82 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -211,7 +211,7 @@ dataStream.filter(new FilterFunction<Integer>() {
           <td>
             <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
             Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how to specify keys.
-            This transformation returns a KeyedDataStream.</p>
+            This transformation returns a KeyedStream.</p>
     {% highlight java %}
 dataStream.keyBy("someKey") // Key by field "someKey"
 dataStream.keyBy(0) // Key by the first element of a Tuple
@@ -607,7 +607,7 @@ dataStream.filter { _ != 0 }
           <td>
             <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
             Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how to specify keys.
-            This transformation returns a KeyedDataStream.</p>
+            This transformation returns a KeyedStream.</p>
     {% highlight scala %}
 dataStream.keyBy("someKey") // Key by field "someKey"
 dataStream.keyBy(0) // Key by the first element of a Tuple

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/docs/dev/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_api_extensions.md b/docs/dev/scala_api_extensions.md
index 0e54ef1..53c0c98 100644
--- a/docs/dev/scala_api_extensions.md
+++ b/docs/dev/scala_api_extensions.md
@@ -316,7 +316,7 @@ data.keyingBy(
     </tr>
     <tr>
       <td><strong>reduceWith</strong></td>
-      <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
+      <td><strong>reduce (KeyedStream, WindowedDataStream)</strong></td>
       <td>
 {% highlight scala %}
 data.reduceWith {
@@ -327,7 +327,7 @@ data.reduceWith {
     </tr>
     <tr>
       <td><strong>foldWith</strong></td>
-      <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
+      <td><strong>fold (KeyedStream, WindowedDataStream)</strong></td>
       <td>
 {% highlight scala %}
 data.foldWith(User(bought = 0)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
index c679285..e69fdb4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
  * 
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index 8ea8364..dd070a9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -29,7 +29,7 @@ import java.io.IOException;
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
  * 
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
index 7e45399..b47a1a1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -28,7 +28,7 @@ import org.apache.flink.annotation.PublicEvolving;
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
  * 
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index 6073c6e..d3fd61e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.PublicEvolving;
  * The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
  * 
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
index 0660f68..f37fddd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -30,7 +30,7 @@ import java.util.Map;
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
  *
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
index 8364fb3..25777eb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
@@ -27,7 +27,7 @@ import org.apache.flink.annotation.PublicEvolving;
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
  * 
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
index 86e9ec8..3cc61b8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
@@ -23,7 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
 /**
  * Interface that different types of partitioned state must implement.
  *
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
index 7e42daa..ac55715 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -29,7 +29,7 @@ import java.io.IOException;
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
  * 
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 79ba0bc..77da8c7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -73,7 +73,7 @@ import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 /**
- * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
+ * A {@link KeyedStream} represents a {@link DataStream} on which operator state is
  * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
  * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
  * partitioning methods such as shuffle, forward and keyBy.

http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
index f6f153a..1d9663a 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
 import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
 import org.junit.Test
 
-class OnKeyedDataStreamTest extends AcceptPFTestBase {
+class OnKeyedStreamTest extends AcceptPFTestBase {
 
   @Test
   def testReduceWithOnTuple(): Unit = {


[6/6] flink git commit: [FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSplits

Posted by tz...@apache.org.
[FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSplits

This closes #4202.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/530a4d1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/530a4d1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/530a4d1f

Branch: refs/heads/release-1.3
Commit: 530a4d1f6ca7a55933563a949393046f58b23af9
Parents: 6630dfd
Author: zhangminglei <zm...@163.com>
Authored: Wed Jun 28 08:45:12 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:10:29 2017 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/flink/api/common/io/FileInputFormat.java | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/530a4d1f/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 4e81dab..1c8e7ff 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -501,8 +501,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 		}
 		
 
-		final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits +
-					(totalLength % minNumSplits == 0 ? 0 : 1));
+		final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);
 
 		// now that we have the files, generate the splits
 		int splitNum = 0;


[3/6] flink git commit: [FLINK-7038] Correct misused terms (WindowedDataStream, JoinedDataStream)

Posted by tz...@apache.org.
[FLINK-7038] Correct misused terms (WindowedDataStream, JoinedDataStream)

This closes #4229.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c9a0b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c9a0b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c9a0b96

Branch: refs/heads/release-1.3
Commit: 0c9a0b96212d7d1549e4d66b23ddbfd982b19b1d
Parents: 7ca13f2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Jul 4 19:12:48 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:51 2017 +0800

----------------------------------------------------------------------
 docs/dev/scala_api_extensions.md                |  8 +-
 .../StreamingScalaAPICompletenessTest.scala     |  4 +-
 .../OnJoinedDataStreamTest.scala                | 67 --------------
 .../OnJoinedStreamTest.scala                    | 67 ++++++++++++++
 .../OnWindowedDataStreamTest.scala              | 97 --------------------
 .../OnWindowedStreamTest.scala                  | 97 ++++++++++++++++++++
 6 files changed, 170 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/docs/dev/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_api_extensions.md b/docs/dev/scala_api_extensions.md
index 53c0c98..283f50b 100644
--- a/docs/dev/scala_api_extensions.md
+++ b/docs/dev/scala_api_extensions.md
@@ -316,7 +316,7 @@ data.keyingBy(
     </tr>
     <tr>
       <td><strong>reduceWith</strong></td>
-      <td><strong>reduce (KeyedStream, WindowedDataStream)</strong></td>
+      <td><strong>reduce (KeyedStream, WindowedStream)</strong></td>
       <td>
 {% highlight scala %}
 data.reduceWith {
@@ -327,7 +327,7 @@ data.reduceWith {
     </tr>
     <tr>
       <td><strong>foldWith</strong></td>
-      <td><strong>fold (KeyedStream, WindowedDataStream)</strong></td>
+      <td><strong>fold (KeyedStream, WindowedStream)</strong></td>
       <td>
 {% highlight scala %}
 data.foldWith(User(bought = 0)) {
@@ -338,7 +338,7 @@ data.foldWith(User(bought = 0)) {
     </tr>
     <tr>
       <td><strong>applyWith</strong></td>
-      <td><strong>apply (WindowedDataStream)</strong></td>
+      <td><strong>apply (WindowedStream)</strong></td>
       <td>
 {% highlight scala %}
 data.applyWith(0)(
@@ -350,7 +350,7 @@ data.applyWith(0)(
     </tr>
     <tr>
       <td><strong>projecting</strong></td>
-      <td><strong>apply (JoinedDataStream)</strong></td>
+      <td><strong>apply (JoinedStream)</strong></td>
       <td>
 {% highlight scala %}
 data1.join(data2).

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 7cf6935..d8737e1 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -48,8 +48,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
 
-      "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
-      "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
+      "org.apache.flink.streaming.api.datastream.WindowedStream.getType",
+      "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionConfig",
 
       "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
       "org.apache.flink.streaming.api.datastream.WindowedStream.getInputType",

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
deleted file mode 100644
index 34c55d7..0000000
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
-import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
-import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.junit.Test
-
-class OnJoinedDataStreamTest extends AcceptPFTestBase {
-
-  @Test
-  def testProjectingOnTuple(): Unit = {
-    val test =
-      tuples.join(tuples).
-        where {
-          case (id, _) => id
-        }.equalTo {
-          case (id, _) => id
-        }.window {
-          TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
-        }.projecting {
-          case ((_, v1), (_, v2)) => s"$v1 $v2"
-        }
-    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
-      "projecting should produce a SingleOutputStreamOperator")
-  }
-
-  @Test
-  def testProjectingOnCaseClass(): Unit = {
-    val test =
-      caseObjects.join(caseObjects).
-      where {
-        case KeyValuePair(id, _) => id
-      }.equalTo {
-        case KeyValuePair(id, _) => id
-      }.window {
-        TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
-      }.projecting {
-        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
-      }
-   assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
-     "projecting should produce a SingleOutputStreamOperator")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
new file mode 100644
index 0000000..2069997
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.Test
+
+class OnJoinedStreamTest extends AcceptPFTestBase {
+
+  @Test
+  def testProjectingOnTuple(): Unit = {
+    val test =
+      tuples.join(tuples).
+        where {
+          case (id, _) => id
+        }.equalTo {
+          case (id, _) => id
+        }.window {
+          TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
+        }.projecting {
+          case ((_, v1), (_, v2)) => s"$v1 $v2"
+        }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "projecting should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testProjectingOnCaseClass(): Unit = {
+    val test =
+      caseObjects.join(caseObjects).
+      where {
+        case KeyValuePair(id, _) => id
+      }.equalTo {
+        case KeyValuePair(id, _) => id
+      }.window {
+        TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
+      }.projecting {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+      }
+   assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+     "projecting should produce a SingleOutputStreamOperator")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
deleted file mode 100644
index 4fa9f5a..0000000
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
-import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
-import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
-import org.junit.Test
-
-class OnWindowedDataStreamTest extends AcceptPFTestBase {
-
-  @Test
-  def testReduceWithOnTuple(): Unit = {
-    val test =
-      windowedTuples.reduceWith {
-        case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2"
-      }
-    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
-      "reduceWith should produce a SingleOutputStreamOperator")
-  }
-
-  @Test
-  def testReduceWithOnCaseClass(): Unit = {
-    val test =
-      windowedCaseObjects.reduceWith {
-        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
-      }
-    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
-      "reduceWith should produce a SingleOutputStreamOperator")
-  }
-
-  @Test
-  def testFoldWithOnTuple(): Unit = {
-    val test =
-      windowedTuples.foldWith("") {
-        case (folding, (_, value)) => s"$folding $value"
-      }
-    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
-      "foldWith should produce a SingleOutputStreamOperator")
-  }
-
-  @Test
-  def testFoldWithOnCaseClass(): Unit = {
-    val test =
-      windowedCaseObjects.foldWith("") {
-        case (folding, KeyValuePair(_, value)) => s"$folding $value"
-      }
-    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
-      "foldWith should produce a SingleOutputStreamOperator")
-  }
-
-  @Test
-  def testApplyWithOnTuple(): Unit = {
-    val test =
-      windowedTuples.applyWith("")(
-        foldFunction = {
-          case (folding, (_, value)) => s"$folding $value"
-        },
-        windowFunction = {
-          case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
-        })
-    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
-      "applyWith should produce a SingleOutputStreamOperator")
-  }
-
-  @Test
-  def testApplyWithOnCaseClass(): Unit = {
-    val test =
-      windowedCaseObjects.applyWith("")(
-        foldFunction = {
-          case (folding, KeyValuePair(_, value)) => s"$folding $value"
-        },
-        windowFunction = {
-          case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
-        })
-    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
-      "applyWith should produce a SingleOutputStreamOperator")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
new file mode 100644
index 0000000..923b869
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnWindowedStreamTest extends AcceptPFTestBase {
+
+  @Test
+  def testReduceWithOnTuple(): Unit = {
+    val test =
+      windowedTuples.reduceWith {
+        case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "reduceWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testReduceWithOnCaseClass(): Unit = {
+    val test =
+      windowedCaseObjects.reduceWith {
+        case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "reduceWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFoldWithOnTuple(): Unit = {
+    val test =
+      windowedTuples.foldWith("") {
+        case (folding, (_, value)) => s"$folding $value"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "foldWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testFoldWithOnCaseClass(): Unit = {
+    val test =
+      windowedCaseObjects.foldWith("") {
+        case (folding, KeyValuePair(_, value)) => s"$folding $value"
+      }
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "foldWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testApplyWithOnTuple(): Unit = {
+    val test =
+      windowedTuples.applyWith("")(
+        foldFunction = {
+          case (folding, (_, value)) => s"$folding $value"
+        },
+        windowFunction = {
+          case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
+        })
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "applyWith should produce a SingleOutputStreamOperator")
+  }
+
+  @Test
+  def testApplyWithOnCaseClass(): Unit = {
+    val test =
+      windowedCaseObjects.applyWith("")(
+        foldFunction = {
+          case (folding, KeyValuePair(_, value)) => s"$folding $value"
+        },
+        windowFunction = {
+          case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
+        })
+    assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+      "applyWith should produce a SingleOutputStreamOperator")
+  }
+
+}


[4/6] flink git commit: [FLINK-7041] Deserialize StateBackend from JobCheckpointingSettings with user classloader

Posted by tz...@apache.org.
[FLINK-7041] Deserialize StateBackend from JobCheckpointingSettings with user classloader

This closes #4232.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78bbb844
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78bbb844
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78bbb844

Branch: refs/heads/release-1.3
Commit: 78bbb844a0b33d4297e9fa1044401a51e219fcfa
Parents: 0c9a0b9
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Jun 30 11:19:30 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:57 2017 +0800

----------------------------------------------------------------------
 .../executiongraph/ExecutionGraphBuilder.java   | 16 +++---
 .../tasks/JobCheckpointingSettings.java         |  8 +--
 .../CheckpointSettingsSerializableTest.java     | 60 +++++++++++++++++++-
 .../tasks/JobCheckpointingSettingsTest.java     |  6 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 18 +++++-
 5 files changed, 92 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index db22da6..7cfc6e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -224,19 +224,21 @@ public class ExecutionGraphBuilder {
 			// if specified in the application, use from there, otherwise load from configuration
 			final StateBackend metadataBackend;
 
-			final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
+			final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
 			if (applicationConfiguredBackend != null) {
-				metadataBackend = applicationConfiguredBackend;
+				try {
+					metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader);
+				} catch (IOException | ClassNotFoundException e) {
+					throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e);
+				}
 
 				log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
-						applicationConfiguredBackend);
-			}
-			else {
+					metadataBackend);
+			} else {
 				try {
 					metadataBackend = AbstractStateBackend
 							.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
-				}
-				catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
+				} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
 					throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
index a30a2ba..cc97e1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -56,7 +56,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 
 	/** The default state backend, if configured by the user in the job */
 	@Nullable
-	private final StateBackend defaultStateBackend;
+	private final SerializedValue<StateBackend> defaultStateBackend;
 
 	/** (Factories for) hooks that are executed on the checkpoint coordinator */
 	@Nullable
@@ -80,7 +80,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
 			ExternalizedCheckpointSettings externalizedCheckpointSettings,
-			@Nullable StateBackend defaultStateBackend,
+			@Nullable SerializedValue<StateBackend> defaultStateBackend,
 			boolean isExactlyOnce) {
 
 		this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm,
@@ -97,7 +97,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
 			ExternalizedCheckpointSettings externalizedCheckpointSettings,
-			@Nullable StateBackend defaultStateBackend,
+			@Nullable SerializedValue<StateBackend> defaultStateBackend,
 			@Nullable SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks,
 			boolean isExactlyOnce) {
 
@@ -155,7 +155,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 	}
 
 	@Nullable
-	public StateBackend getDefaultStateBackend() {
+	public SerializedValue<StateBackend> getDefaultStateBackend() {
 		return defaultStateBackend;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 0246180..f597757 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import java.io.IOException;
+import javax.annotation.Nullable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -31,6 +35,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -43,6 +53,7 @@ import java.net.URLClassLoader;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -53,7 +64,7 @@ import static org.mockito.Mockito.when;
 public class CheckpointSettingsSerializableTest extends TestLogger {
 
 	@Test
-	public void testClassLoaderForCheckpointHooks() throws Exception {
+	public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception {
 		final ClassLoader classLoader = new URLClassLoader(new URL[0], getClass().getClassLoader());
 		final Serializable outOfClassPath = CommonTestUtils.createObjectForClassNotInClassPath(classLoader);
 
@@ -70,7 +81,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 				0L,
 				1,
 				ExternalizedCheckpointSettings.none(),
-				null,
+				new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
 				serHooks,
 				true);
 
@@ -97,6 +108,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 				log);
 
 		assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
+		assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
 	}
 
 	// ------------------------------------------------------------------------
@@ -119,4 +131,48 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 			return hook;
 		}
 	}
+
+	private static final class CustomStateBackend implements StateBackend {
+
+		/**
+		 * Simulate a custom option that is not in the normal classpath.
+		 */
+		private Serializable customOption;
+
+		public CustomStateBackend(Serializable customOption) {
+			this.customOption = customOption;
+		}
+
+		@Override
+		public CheckpointStreamFactory createStreamFactory(
+			JobID jobId, String operatorIdentifier) throws IOException {
+			return null;
+		}
+
+		@Override
+		public CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			@Nullable String targetLocation) throws IOException {
+			return null;
+		}
+
+		@Override
+		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry) throws Exception {
+			return null;
+		}
+
+		@Override
+		public OperatorStateBackend createOperatorStateBackend(
+			Environment env, String operatorIdentifier) throws Exception {
+			return null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index c3524fa..097c296 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -45,7 +47,7 @@ public class JobCheckpointingSettingsTest {
 			112,
 			12,
 			ExternalizedCheckpointSettings.externalizeCheckpoints(true),
-			new MemoryStateBackend(),
+			new SerializedValue<StateBackend>(new MemoryStateBackend()),
 			false);
 
 		JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
@@ -60,6 +62,6 @@ public class JobCheckpointingSettingsTest {
 		assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
 		assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
 		assertNotNull(copy.getDefaultStateBackend());
-		assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class);
+		assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6d1af72..c807344 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -660,6 +661,21 @@ public class StreamingJobGraphGenerator {
 			}
 		}
 
+		// because the state backend can have user-defined code, it needs to be stored as
+		// eagerly serialized value
+		final SerializedValue<StateBackend> serializedStateBackend;
+		if (streamGraph.getStateBackend() == null) {
+			serializedStateBackend = null;
+		} else {
+			try {
+				serializedStateBackend =
+					new SerializedValue<StateBackend>(streamGraph.getStateBackend());
+			}
+			catch (IOException e) {
+				throw new FlinkRuntimeException("State backend is not serializable", e);
+			}
+		}
+
 		//  --- done, put it all together ---
 
 		JobCheckpointingSettings settings = new JobCheckpointingSettings(
@@ -667,7 +683,7 @@ public class StreamingJobGraphGenerator {
 				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
 				cfg.getMaxConcurrentCheckpoints(),
 				externalizedCheckpointSettings,
-				streamGraph.getStateBackend(),
+				serializedStateBackend,
 				serializedHooks,
 				isExactlyOnce);
 


[5/6] flink git commit: [FLINK-6996] [kafka] Fix at-least-once semantic for FlinkKafkaProducer010

Posted by tz...@apache.org.
[FLINK-6996] [kafka] Fix at-least-once semantic for FlinkKafkaProducer010

This closes #4206.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6630dfdd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6630dfdd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6630dfdd

Branch: refs/heads/release-1.3
Commit: 6630dfdd748dee9c2fa6a0993497dcf3468a0948
Parents: 78bbb84
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Jun 26 11:28:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:10:18 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java |  17 +-
 .../kafka/Kafka010ProducerITCase.java           |  13 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  37 +++
 .../connectors/kafka/Kafka08ProducerITCase.java |  13 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  36 +++
 .../connectors/kafka/Kafka09ProducerITCase.java |  14 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  36 +++
 .../connectors/kafka/KafkaConsumerTestBase.java |  15 +-
 .../connectors/kafka/KafkaProducerTestBase.java | 253 ++++++++++++++++++-
 .../connectors/kafka/KafkaTestEnvironment.java  |  16 ++
 10 files changed, 420 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 711fe07..7909ba6 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -24,6 +24,9 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -66,7 +69,7 @@ import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase
  *
  * All methods and constructors in this class are marked with the approach they are needed for.
  */
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction, CheckpointedFunction {
 
 	/**
 	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
@@ -418,6 +421,18 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		invokeInternal(element.getValue(), element.getTimestamp());
 	}
 
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.initializeState(context);
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.snapshotState(context);
+	}
+
 	/**
 	 * Configuration object returned by the writeToKafkaWithTimestamps() call.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index 42b9682..f811893 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -18,16 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
-import org.junit.Test;
-
-
+/**
+ * IT cases for the {@link FlinkKafkaProducer010}.
+ */
 @SuppressWarnings("serial")
 public class Kafka010ProducerITCase extends KafkaProducerTestBase {
-
-	@Test
-	public void testCustomPartitioning() {
-		runCustomPartitioningTest();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c88c858..3094172 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -25,8 +25,10 @@ import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -35,6 +37,8 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSeri
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -46,7 +50,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -116,6 +123,31 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+		List<ConsumerRecord<K, V>> result = new ArrayList<>();
+		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+		consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+		while (true) {
+			boolean processedAtLeastOneRecord = false;
+
+			// wait for new records with timeout and break the loop if we didn't get any
+			Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+			while (iterator.hasNext()) {
+				ConsumerRecord<K, V> record = iterator.next();
+				result.add(record);
+				processedAtLeastOneRecord = true;
+			}
+
+			if (!processedAtLeastOneRecord) {
+				break;
+			}
+		}
+
+		return UnmodifiableList.decorate(result);
+	}
+
+	@Override
 	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
@@ -131,6 +163,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+		return FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props);
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index 5c951db..6710a77 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
-import org.junit.Test;
-
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 
-	@Test
-	public void testCustomPartitioning() {
-		runCustomPartitioningTest();
+	@Override
+	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+		// TODO: enable this for Kafka 0.8 - now it hangs indefinitely
 	}
 
+	@Override
+	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+		// Disable this test since FlinkKafka08Producer doesn't support custom operator mode
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 2419b53..c0e6939 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -25,12 +25,14 @@ import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -41,6 +43,12 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSeri
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.Seq;
@@ -50,8 +58,10 @@ import java.io.IOException;
 import java.net.BindException;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -105,6 +115,27 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+		List<ConsumerRecord<K, V>> result = new ArrayList<>();
+		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+		consumer.subscribe(new TopicPartition(topic, partition));
+
+		while (true) {
+			Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
+			if (topics == null || !topics.containsKey(topic)) {
+				break;
+			}
+			List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
+			result.addAll(records);
+			if (records.size() == 0) {
+				break;
+			}
+		}
+
+		return UnmodifiableList.decorate(result);
+	}
+
+	@Override
 	public <T> StreamSink<T> getProducerSink(
 			String topic,
 			KeyedSerializationSchema<T> serSchema,
@@ -127,6 +158,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index ae4f5b2..847f818 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -18,15 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
-import org.junit.Test;
-
+/**
+ * IT cases for the {@link FlinkKafkaProducer09}.
+ */
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
-	@Test
-	public void testCustomPartitioning() {
-		runCustomPartitioningTest();
+	@Override
+	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+		// Disable this test since FlinkKafka09Producer doesn't support custom operator mode
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 84fdbf8..3149308 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -27,8 +27,10 @@ import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -37,6 +39,8 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSeri
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -48,7 +52,10 @@ import scala.collection.Seq;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -102,6 +109,30 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+		List<ConsumerRecord<K, V>> result = new ArrayList<>();
+		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+		consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+		while (true) {
+			boolean processedAtLeastOneRecord = false;
+
+			Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+			while (iterator.hasNext()) {
+				ConsumerRecord<K, V> record = iterator.next();
+				result.add(record);
+				processedAtLeastOneRecord = true;
+			}
+
+			if (!processedAtLeastOneRecord) {
+				break;
+			}
+		}
+
+		return UnmodifiableList.decorate(result);
+	}
+
+	@Override
 	public <T> StreamSink<T> getProducerSink(
 			String topic,
 			KeyedSerializationSchema<T> serSchema,
@@ -120,6 +151,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 285bcf9..1de8e6f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -788,7 +788,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DataGenerators.generateRandomizedIntegerSequence(
 				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
-				topic, parallelism, numElementsPerPartition, true);
+				topic,
+				parallelism,
+				numElementsPerPartition,
+				true);
 
 		// run the topology that fails and recovers
 
@@ -837,7 +840,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DataGenerators.generateRandomizedIntegerSequence(
 				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
-				topic, numPartitions, numElementsPerPartition, false);
+				topic,
+				numPartitions,
+				numElementsPerPartition,
+				true);
 
 		// run the topology that fails and recovers
 
@@ -885,7 +891,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DataGenerators.generateRandomizedIntegerSequence(
 				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
-				topic, numPartitions, numElementsPerPartition, true);
+				topic,
+				numPartitions,
+				numElementsPerPartition,
+				true);
 
 		// run the topology that fails and recovers
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index bcc8328..dc76249 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -18,26 +18,47 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Preconditions;
 
+import com.google.common.collect.ImmutableSet;
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Test;
+
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
@@ -74,7 +95,8 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 * Each topic also has a final sink that validates that there are no duplicates and that all
 	 * partitions are present.
 	 */
-	public void runCustomPartitioningTest() {
+	@Test
+	public void testCustomPartitioning() {
 		try {
 			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
 
@@ -130,7 +152,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			props.putAll(secureProps);
 
 			// sink partitions into
-			kafkaServer.produceIntoKafka(stream, defaultTopic,
+			kafkaServer.produceIntoKafka(
+					stream,
+					defaultTopic,
 					// this serialization schema will route between the default topic and dynamic topic
 					new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
 					props,
@@ -169,6 +193,145 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 		}
 	}
 
+	/**
+	 * Tests the at-least-once semantic for the simple writes into Kafka.
+	 */
+	@Test
+	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+		testOneToOneAtLeastOnce(true);
+	}
+
+	/**
+	 * Tests the at-least-once semantic for the simple writes into Kafka.
+	 */
+	@Test
+	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+		testOneToOneAtLeastOnce(false);
+	}
+
+	/**
+	 * This test sets KafkaProducer so that it will not automatically flush the data and
+	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
+	 */
+	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
+		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
+		final int partition = 0;
+		final int numElements = 1000;
+		final int failAfterElements = 333;
+
+		createTestTopic(topic, 1, 1);
+
+		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing(500);
+		env.setParallelism(1);
+		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
+		properties.setProperty("timeout.ms", "10000");
+		properties.setProperty("max.block.ms", "10000");
+		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
+		properties.setProperty("batch.size", "10240000");
+		properties.setProperty("linger.ms", "10000");
+
+		int leaderId = kafkaServer.getLeaderToShutDown(topic);
+		BrokerRestartingMapper.resetState();
+
+		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
+		DataStream<Integer> inputStream = env
+			.fromCollection(getIntegersSequence(numElements))
+			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
+
+		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+			@Override
+			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+				return partition;
+			}
+		});
+
+		if (regularSink) {
+			inputStream.addSink(kafkaSink.getUserFunction());
+		}
+		else {
+			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+				@Override
+				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+					return partition;
+				}
+			});
+		}
+
+		FailingIdentityMapper.failedBefore = false;
+		try {
+			env.execute("One-to-one at least once test");
+			fail("Job should fail!");
+		}
+		catch (JobExecutionException ex) {
+			assertEquals("Broker was shutdown!", ex.getCause().getMessage());
+		}
+
+		kafkaServer.restartBroker(leaderId);
+
+		// assert that before failure we successfully snapshot/flushed all expected elements
+		assertAtLeastOnceForTopic(
+				properties,
+				topic,
+				partition,
+				ImmutableSet.copyOf(getIntegersSequence(BrokerRestartingMapper.numElementsBeforeSnapshot)),
+				30000L);
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
+	 * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+	 */
+	private void assertAtLeastOnceForTopic(
+			Properties properties,
+			String topic,
+			int partition,
+			Set<Integer> expectedElements,
+			long timeoutMillis) throws Exception {
+
+		long startMillis = System.currentTimeMillis();
+		Set<Integer> actualElements = new HashSet<>();
+
+		// until we timeout...
+		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
+			properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+			properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+
+			// query kafka for new records ...
+			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
+
+			for (ConsumerRecord<Integer, Integer> record : records) {
+				actualElements.add(record.value());
+			}
+
+			// succeed if we got all expectedElements
+			if (actualElements.containsAll(expectedElements)) {
+				return;
+			}
+		}
+
+		fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
+	}
+
+	private List<Integer> getIntegersSequence(int size) {
+		List<Integer> result = new ArrayList<>(size);
+		for (int i = 0; i < size; i++) {
+			result.add(i);
+		}
+		return result;
+	}
+
 	// ------------------------------------------------------------------------
 
 	public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
@@ -263,4 +426,90 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			}
 		}
 	}
+
+	private static class BrokerRestartingMapper<T> extends RichMapFunction<T, T>
+		implements CheckpointedFunction, CheckpointListener {
+
+		private static final long serialVersionUID = 6334389850158707313L;
+
+		public static volatile boolean restartedLeaderBefore;
+		public static volatile boolean hasBeenCheckpointedBeforeFailure;
+		public static volatile int numElementsBeforeSnapshot;
+
+		private final int shutdownBrokerId;
+		private final int failCount;
+		private int numElementsTotal;
+
+		private boolean failer;
+		private boolean hasBeenCheckpointed;
+
+		public static void resetState() {
+			restartedLeaderBefore = false;
+			hasBeenCheckpointedBeforeFailure = false;
+			numElementsBeforeSnapshot = 0;
+		}
+
+		public BrokerRestartingMapper(int shutdownBrokerId, int failCount) {
+			this.shutdownBrokerId = shutdownBrokerId;
+			this.failCount = failCount;
+		}
+
+		@Override
+		public void open(Configuration parameters) {
+			failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+		}
+
+		@Override
+		public T map(T value) throws Exception {
+			numElementsTotal++;
+
+			if (!restartedLeaderBefore) {
+				Thread.sleep(10);
+
+				if (failer && numElementsTotal >= failCount) {
+					// shut down a Kafka broker
+					KafkaServer toShutDown = null;
+					for (KafkaServer server : kafkaServer.getBrokers()) {
+
+						if (kafkaServer.getBrokerId(server) == shutdownBrokerId) {
+							toShutDown = server;
+							break;
+						}
+					}
+
+					if (toShutDown == null) {
+						StringBuilder listOfBrokers = new StringBuilder();
+						for (KafkaServer server : kafkaServer.getBrokers()) {
+							listOfBrokers.append(kafkaServer.getBrokerId(server));
+							listOfBrokers.append(" ; ");
+						}
+
+						throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
+												+ " ; available brokers: " + listOfBrokers.toString());
+					} else {
+						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+						restartedLeaderBefore = true;
+						toShutDown.shutdown();
+						toShutDown.awaitShutdown();
+						throw new Exception("Broker was shutdown!");
+					}
+				}
+			}
+			return value;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			hasBeenCheckpointed = true;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+			numElementsBeforeSnapshot = numElementsTotal;
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 311a1a4..7b2a42d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -31,6 +31,10 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.Collection;
+
 /**
  * Abstract class providing a Kafka test environment
  */
@@ -79,6 +83,12 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
 
+	public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
+			Properties properties,
+			String topic,
+			int partition,
+			long timeout);
+
 	public abstract <T> StreamSink<T> getProducerSink(String topic,
 			KeyedSerializationSchema<T> serSchema, Properties props,
 			FlinkKafkaPartitioner<T> partitioner);
@@ -87,6 +97,12 @@ public abstract class KafkaTestEnvironment {
 														KeyedSerializationSchema<T> serSchema, Properties props,
 														FlinkKafkaPartitioner<T> partitioner);
 
+	public abstract <T> DataStreamSink<T> writeToKafkaWithTimestamps(
+			DataStream<T> stream,
+			String topic,
+			KeyedSerializationSchema<T> serSchema,
+			Properties props);
+
 	// -- offset handlers
 
 	public interface KafkaOffsetHandler {