You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/06 04:25:59 UTC
[1/6] flink git commit: [hotfix] [docs] Added missing "more" word to
programming-model docs.
Repository: flink
Updated Branches:
refs/heads/release-1.3 5f4296ee8 -> 530a4d1f6
[hotfix] [docs] Added missing "more" word to programming-model docs.
This closes #4244.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7fb4ca6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7fb4ca6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7fb4ca6
Branch: refs/heads/release-1.3
Commit: b7fb4ca6af84cca91897bbea19e3e5cc4c153c15
Parents: 5f4296e
Author: adebski <an...@gmail.com>
Authored: Sun Jul 2 17:28:41 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:37 2017 +0800
----------------------------------------------------------------------
docs/concepts/programming-model.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b7fb4ca6/docs/concepts/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/concepts/programming-model.md b/docs/concepts/programming-model.md
index d83cf00..7b0cfb5 100644
--- a/docs/concepts/programming-model.md
+++ b/docs/concepts/programming-model.md
@@ -89,7 +89,7 @@ Transformations are documented in [DataStream transformations](../dev/datastream
## Parallel Dataflows
Programs in Flink are inherently parallel and distributed. During execution, a *stream* has one or more **stream partitions**,
-and each *operator* has one or **operator subtasks**. The operator subtasks are independent of one another, and execute in different threads
+and each *operator* has one or more **operator subtasks**. The operator subtasks are independent of one another, and execute in different threads
and possibly on different machines or containers.
The number of operator subtasks is the **parallelism** of that particular operator. The parallelism of a stream
[2/6] flink git commit: [FLINK-7038] [docs] Correct misused term
(KeyedDataStream -> KeyedStream)
Posted by tz...@apache.org.
[FLINK-7038] [docs] Correct misused term (KeyedDataStream -> KeyedStream)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ca13f2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ca13f2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ca13f2f
Branch: refs/heads/release-1.3
Commit: 7ca13f2f8d5163c77a77f2d40048ff1f27b1ea11
Parents: b7fb4ca
Author: zhangminglei <zm...@163.com>
Authored: Sat Jul 1 17:02:03 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:44 2017 +0800
----------------------------------------------------------------------
docs/dev/datastream_api.md | 4 ++--
docs/dev/scala_api_extensions.md | 4 ++--
.../java/org/apache/flink/api/common/state/AggregatingState.java | 2 +-
.../java/org/apache/flink/api/common/state/AppendingState.java | 2 +-
.../java/org/apache/flink/api/common/state/FoldingState.java | 2 +-
.../main/java/org/apache/flink/api/common/state/ListState.java | 2 +-
.../main/java/org/apache/flink/api/common/state/MapState.java | 2 +-
.../java/org/apache/flink/api/common/state/ReducingState.java | 2 +-
.../src/main/java/org/apache/flink/api/common/state/State.java | 2 +-
.../main/java/org/apache/flink/api/common/state/ValueState.java | 2 +-
.../org/apache/flink/streaming/api/datastream/KeyedStream.java | 2 +-
.../impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala | 2 +-
12 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 728c945..7191d82 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -211,7 +211,7 @@ dataStream.filter(new FilterFunction<Integer>() {
<td>
<p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how to specify keys.
- This transformation returns a KeyedDataStream.</p>
+ This transformation returns a KeyedStream.</p>
{% highlight java %}
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
@@ -607,7 +607,7 @@ dataStream.filter { _ != 0 }
<td>
<p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how to specify keys.
- This transformation returns a KeyedDataStream.</p>
+ This transformation returns a KeyedStream.</p>
{% highlight scala %}
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/docs/dev/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_api_extensions.md b/docs/dev/scala_api_extensions.md
index 0e54ef1..53c0c98 100644
--- a/docs/dev/scala_api_extensions.md
+++ b/docs/dev/scala_api_extensions.md
@@ -316,7 +316,7 @@ data.keyingBy(
</tr>
<tr>
<td><strong>reduceWith</strong></td>
- <td><strong>reduce (KeyedDataStream, WindowedDataStream)</strong></td>
+ <td><strong>reduce (KeyedStream, WindowedDataStream)</strong></td>
<td>
{% highlight scala %}
data.reduceWith {
@@ -327,7 +327,7 @@ data.reduceWith {
</tr>
<tr>
<td><strong>foldWith</strong></td>
- <td><strong>fold (KeyedDataStream, WindowedDataStream)</strong></td>
+ <td><strong>fold (KeyedStream, WindowedDataStream)</strong></td>
<td>
{% highlight scala %}
data.foldWith(User(bought = 0)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
index c679285..e69fdb4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
* <p>The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index 8ea8364..dd070a9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -29,7 +29,7 @@ import java.io.IOException;
* <p>The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
index 7e45399..b47a1a1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -28,7 +28,7 @@ import org.apache.flink.annotation.PublicEvolving;
* <p>The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index 6073c6e..d3fd61e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.PublicEvolving;
* The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
index 0660f68..f37fddd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -30,7 +30,7 @@ import java.util.Map;
* <p>The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
index 8364fb3..25777eb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
@@ -27,7 +27,7 @@ import org.apache.flink.annotation.PublicEvolving;
* <p>The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
index 86e9ec8..3cc61b8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
@@ -23,7 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
/**
* Interface that different types of partitioned state must implement.
*
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
index 7e42daa..ac55715 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -29,7 +29,7 @@ import java.io.IOException;
* <p>The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 79ba0bc..77da8c7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -73,7 +73,7 @@ import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
/**
- * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
+ * A {@link KeyedStream} represents a {@link DataStream} on which operator state is
* partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
* {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
* partitioning methods such as shuffle, forward and keyBy.
http://git-wip-us.apache.org/repos/asf/flink/blob/7ca13f2f/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
index f6f153a..1d9663a 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
import org.junit.Test
-class OnKeyedDataStreamTest extends AcceptPFTestBase {
+class OnKeyedStreamTest extends AcceptPFTestBase {
@Test
def testReduceWithOnTuple(): Unit = {
[6/6] flink git commit: [FLINK-6422] [core] Unreachable code in
FileInputFormat#createInputSplits
Posted by tz...@apache.org.
[FLINK-6422] [core] Unreachable code in FileInputFormat#createInputSplits
This closes #4202.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/530a4d1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/530a4d1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/530a4d1f
Branch: refs/heads/release-1.3
Commit: 530a4d1f6ca7a55933563a949393046f58b23af9
Parents: 6630dfd
Author: zhangminglei <zm...@163.com>
Authored: Wed Jun 28 08:45:12 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:10:29 2017 +0800
----------------------------------------------------------------------
.../main/java/org/apache/flink/api/common/io/FileInputFormat.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/530a4d1f/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 4e81dab..1c8e7ff 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -501,8 +501,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
}
- final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits +
- (totalLength % minNumSplits == 0 ? 0 : 1));
+ final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);
// now that we have the files, generate the splits
int splitNum = 0;
[3/6] flink git commit: [FLINK-7038] Correct misused terms
(WindowedDataStream, JoinedDataStream)
Posted by tz...@apache.org.
[FLINK-7038] Correct misused terms (WindowedDataStream, JoinedDataStream)
This closes #4229.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c9a0b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c9a0b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c9a0b96
Branch: refs/heads/release-1.3
Commit: 0c9a0b96212d7d1549e4d66b23ddbfd982b19b1d
Parents: 7ca13f2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Jul 4 19:12:48 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:51 2017 +0800
----------------------------------------------------------------------
docs/dev/scala_api_extensions.md | 8 +-
.../StreamingScalaAPICompletenessTest.scala | 4 +-
.../OnJoinedDataStreamTest.scala | 67 --------------
.../OnJoinedStreamTest.scala | 67 ++++++++++++++
.../OnWindowedDataStreamTest.scala | 97 --------------------
.../OnWindowedStreamTest.scala | 97 ++++++++++++++++++++
6 files changed, 170 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/docs/dev/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_api_extensions.md b/docs/dev/scala_api_extensions.md
index 53c0c98..283f50b 100644
--- a/docs/dev/scala_api_extensions.md
+++ b/docs/dev/scala_api_extensions.md
@@ -316,7 +316,7 @@ data.keyingBy(
</tr>
<tr>
<td><strong>reduceWith</strong></td>
- <td><strong>reduce (KeyedStream, WindowedDataStream)</strong></td>
+ <td><strong>reduce (KeyedStream, WindowedStream)</strong></td>
<td>
{% highlight scala %}
data.reduceWith {
@@ -327,7 +327,7 @@ data.reduceWith {
</tr>
<tr>
<td><strong>foldWith</strong></td>
- <td><strong>fold (KeyedStream, WindowedDataStream)</strong></td>
+ <td><strong>fold (KeyedStream, WindowedStream)</strong></td>
<td>
{% highlight scala %}
data.foldWith(User(bought = 0)) {
@@ -338,7 +338,7 @@ data.foldWith(User(bought = 0)) {
</tr>
<tr>
<td><strong>applyWith</strong></td>
- <td><strong>apply (WindowedDataStream)</strong></td>
+ <td><strong>apply (WindowedStream)</strong></td>
<td>
{% highlight scala %}
data.applyWith(0)(
@@ -350,7 +350,7 @@ data.applyWith(0)(
</tr>
<tr>
<td><strong>projecting</strong></td>
- <td><strong>apply (JoinedDataStream)</strong></td>
+ <td><strong>apply (JoinedStream)</strong></td>
<td>
{% highlight scala %}
data1.join(data2).
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 7cf6935..d8737e1 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -48,8 +48,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
"org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
- "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
- "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
+ "org.apache.flink.streaming.api.datastream.WindowedStream.getType",
+ "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionConfig",
"org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
"org.apache.flink.streaming.api.datastream.WindowedStream.getInputType",
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
deleted file mode 100644
index 34c55d7..0000000
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
-import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
-import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.junit.Test
-
-class OnJoinedDataStreamTest extends AcceptPFTestBase {
-
- @Test
- def testProjectingOnTuple(): Unit = {
- val test =
- tuples.join(tuples).
- where {
- case (id, _) => id
- }.equalTo {
- case (id, _) => id
- }.window {
- TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
- }.projecting {
- case ((_, v1), (_, v2)) => s"$v1 $v2"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "projecting should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testProjectingOnCaseClass(): Unit = {
- val test =
- caseObjects.join(caseObjects).
- where {
- case KeyValuePair(id, _) => id
- }.equalTo {
- case KeyValuePair(id, _) => id
- }.window {
- TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
- }.projecting {
- case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "projecting should produce a SingleOutputStreamOperator")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
new file mode 100644
index 0000000..2069997
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.Test
+
+class OnJoinedStreamTest extends AcceptPFTestBase {
+
+ @Test
+ def testProjectingOnTuple(): Unit = {
+ val test =
+ tuples.join(tuples).
+ where {
+ case (id, _) => id
+ }.equalTo {
+ case (id, _) => id
+ }.window {
+ TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
+ }.projecting {
+ case ((_, v1), (_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "projecting should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testProjectingOnCaseClass(): Unit = {
+ val test =
+ caseObjects.join(caseObjects).
+ where {
+ case KeyValuePair(id, _) => id
+ }.equalTo {
+ case KeyValuePair(id, _) => id
+ }.window {
+ TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
+ }.projecting {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "projecting should produce a SingleOutputStreamOperator")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
deleted file mode 100644
index 4fa9f5a..0000000
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
-import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
-import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
-import org.junit.Test
-
-class OnWindowedDataStreamTest extends AcceptPFTestBase {
-
- @Test
- def testReduceWithOnTuple(): Unit = {
- val test =
- windowedTuples.reduceWith {
- case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "reduceWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testReduceWithOnCaseClass(): Unit = {
- val test =
- windowedCaseObjects.reduceWith {
- case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "reduceWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testFoldWithOnTuple(): Unit = {
- val test =
- windowedTuples.foldWith("") {
- case (folding, (_, value)) => s"$folding $value"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "foldWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testFoldWithOnCaseClass(): Unit = {
- val test =
- windowedCaseObjects.foldWith("") {
- case (folding, KeyValuePair(_, value)) => s"$folding $value"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "foldWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testApplyWithOnTuple(): Unit = {
- val test =
- windowedTuples.applyWith("")(
- foldFunction = {
- case (folding, (_, value)) => s"$folding $value"
- },
- windowFunction = {
- case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
- })
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "applyWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testApplyWithOnCaseClass(): Unit = {
- val test =
- windowedCaseObjects.applyWith("")(
- foldFunction = {
- case (folding, KeyValuePair(_, value)) => s"$folding $value"
- },
- windowFunction = {
- case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
- })
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "applyWith should produce a SingleOutputStreamOperator")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
new file mode 100644
index 0000000..923b869
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnWindowedStreamTest extends AcceptPFTestBase {
+
+ @Test
+ def testReduceWithOnTuple(): Unit = {
+ val test =
+ windowedTuples.reduceWith {
+ case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "reduceWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testReduceWithOnCaseClass(): Unit = {
+ val test =
+ windowedCaseObjects.reduceWith {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "reduceWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testFoldWithOnTuple(): Unit = {
+ val test =
+ windowedTuples.foldWith("") {
+ case (folding, (_, value)) => s"$folding $value"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "foldWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testFoldWithOnCaseClass(): Unit = {
+ val test =
+ windowedCaseObjects.foldWith("") {
+ case (folding, KeyValuePair(_, value)) => s"$folding $value"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "foldWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testApplyWithOnTuple(): Unit = {
+ val test =
+ windowedTuples.applyWith("")(
+ foldFunction = {
+ case (folding, (_, value)) => s"$folding $value"
+ },
+ windowFunction = {
+ case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
+ })
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "applyWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testApplyWithOnCaseClass(): Unit = {
+ val test =
+ windowedCaseObjects.applyWith("")(
+ foldFunction = {
+ case (folding, KeyValuePair(_, value)) => s"$folding $value"
+ },
+ windowFunction = {
+ case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
+ })
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "applyWith should produce a SingleOutputStreamOperator")
+ }
+
+}
[4/6] flink git commit: [FLINK-7041] Deserialize StateBackend from
JobCheckpointingSettings with user classloader
Posted by tz...@apache.org.
[FLINK-7041] Deserialize StateBackend from JobCheckpointingSettings with user classloader
This closes #4232.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78bbb844
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78bbb844
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78bbb844
Branch: refs/heads/release-1.3
Commit: 78bbb844a0b33d4297e9fa1044401a51e219fcfa
Parents: 0c9a0b9
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Jun 30 11:19:30 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:57 2017 +0800
----------------------------------------------------------------------
.../executiongraph/ExecutionGraphBuilder.java | 16 +++---
.../tasks/JobCheckpointingSettings.java | 8 +--
.../CheckpointSettingsSerializableTest.java | 60 +++++++++++++++++++-
.../tasks/JobCheckpointingSettingsTest.java | 6 +-
.../api/graph/StreamingJobGraphGenerator.java | 18 +++++-
5 files changed, 92 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index db22da6..7cfc6e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -224,19 +224,21 @@ public class ExecutionGraphBuilder {
// if specified in the application, use from there, otherwise load from configuration
final StateBackend metadataBackend;
- final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
+ final SerializedValue<StateBackend> applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
if (applicationConfiguredBackend != null) {
- metadataBackend = applicationConfiguredBackend;
+ try {
+ metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e);
+ }
log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.",
- applicationConfiguredBackend);
- }
- else {
+ metadataBackend);
+ } else {
try {
metadataBackend = AbstractStateBackend
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
- }
- catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
+ } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
index a30a2ba..cc97e1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -56,7 +56,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
/** The default state backend, if configured by the user in the job */
@Nullable
- private final StateBackend defaultStateBackend;
+ private final SerializedValue<StateBackend> defaultStateBackend;
/** (Factories for) hooks that are executed on the checkpoint coordinator */
@Nullable
@@ -80,7 +80,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
ExternalizedCheckpointSettings externalizedCheckpointSettings,
- @Nullable StateBackend defaultStateBackend,
+ @Nullable SerializedValue<StateBackend> defaultStateBackend,
boolean isExactlyOnce) {
this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm,
@@ -97,7 +97,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
ExternalizedCheckpointSettings externalizedCheckpointSettings,
- @Nullable StateBackend defaultStateBackend,
+ @Nullable SerializedValue<StateBackend> defaultStateBackend,
@Nullable SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks,
boolean isExactlyOnce) {
@@ -155,7 +155,7 @@ public class JobCheckpointingSettings implements java.io.Serializable {
}
@Nullable
- public StateBackend getDefaultStateBackend() {
+ public SerializedValue<StateBackend> getDefaultStateBackend() {
return defaultStateBackend;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 0246180..f597757 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -18,11 +18,15 @@
package org.apache.flink.runtime.checkpoint;
+import java.io.IOException;
+import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -31,6 +35,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -43,6 +53,7 @@ import java.net.URLClassLoader;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -53,7 +64,7 @@ import static org.mockito.Mockito.when;
public class CheckpointSettingsSerializableTest extends TestLogger {
@Test
- public void testClassLoaderForCheckpointHooks() throws Exception {
+ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception {
final ClassLoader classLoader = new URLClassLoader(new URL[0], getClass().getClassLoader());
final Serializable outOfClassPath = CommonTestUtils.createObjectForClassNotInClassPath(classLoader);
@@ -70,7 +81,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
0L,
1,
ExternalizedCheckpointSettings.none(),
- null,
+ new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
serHooks,
true);
@@ -97,6 +108,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
log);
assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
+ assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
}
// ------------------------------------------------------------------------
@@ -119,4 +131,48 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
return hook;
}
}
+
+ private static final class CustomStateBackend implements StateBackend {
+
+ /**
+ * Simulate a custom option that is not in the normal classpath.
+ */
+ private Serializable customOption;
+
+ public CustomStateBackend(Serializable customOption) {
+ this.customOption = customOption;
+ }
+
+ @Override
+ public CheckpointStreamFactory createStreamFactory(
+ JobID jobId, String operatorIdentifier) throws IOException {
+ return null;
+ }
+
+ @Override
+ public CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ @Nullable String targetLocation) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry) throws Exception {
+ return null;
+ }
+
+ @Override
+ public OperatorStateBackend createOperatorStateBackend(
+ Environment env, String operatorIdentifier) throws Exception {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
index c3524fa..097c296 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.SerializedValue;
import org.junit.Test;
import java.util.Arrays;
@@ -45,7 +47,7 @@ public class JobCheckpointingSettingsTest {
112,
12,
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
- new MemoryStateBackend(),
+ new SerializedValue<StateBackend>(new MemoryStateBackend()),
false);
JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
@@ -60,6 +62,6 @@ public class JobCheckpointingSettingsTest {
assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
assertNotNull(copy.getDefaultStateBackend());
- assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class);
+ assertTrue(copy.getDefaultStateBackend().deserializeValue(this.getClass().getClassLoader()).getClass() == MemoryStateBackend.class);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78bbb844/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6d1af72..c807344 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -660,6 +661,21 @@ public class StreamingJobGraphGenerator {
}
}
+ // because the state backend can have user-defined code, it needs to be stored as
+ // eagerly serialized value
+ final SerializedValue<StateBackend> serializedStateBackend;
+ if (streamGraph.getStateBackend() == null) {
+ serializedStateBackend = null;
+ } else {
+ try {
+ serializedStateBackend =
+ new SerializedValue<StateBackend>(streamGraph.getStateBackend());
+ }
+ catch (IOException e) {
+ throw new FlinkRuntimeException("State backend is not serializable", e);
+ }
+ }
+
// --- done, put it all together ---
JobCheckpointingSettings settings = new JobCheckpointingSettings(
@@ -667,7 +683,7 @@ public class StreamingJobGraphGenerator {
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints(),
externalizedCheckpointSettings,
- streamGraph.getStateBackend(),
+ serializedStateBackend,
serializedHooks,
isExactlyOnce);
[5/6] flink git commit: [FLINK-6996] [kafka] Fix at-least-once
semantic for FlinkKafkaProducer010
Posted by tz...@apache.org.
[FLINK-6996] [kafka] Fix at-least-once semantic for FlinkKafkaProducer010
This closes #4206.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6630dfdd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6630dfdd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6630dfdd
Branch: refs/heads/release-1.3
Commit: 6630dfdd748dee9c2fa6a0993497dcf3468a0948
Parents: 78bbb84
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Jun 26 11:28:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:10:18 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer010.java | 17 +-
.../kafka/Kafka010ProducerITCase.java | 13 +-
.../kafka/KafkaTestEnvironmentImpl.java | 37 +++
.../connectors/kafka/Kafka08ProducerITCase.java | 13 +-
.../kafka/KafkaTestEnvironmentImpl.java | 36 +++
.../connectors/kafka/Kafka09ProducerITCase.java | 14 +-
.../kafka/KafkaTestEnvironmentImpl.java | 36 +++
.../connectors/kafka/KafkaConsumerTestBase.java | 15 +-
.../connectors/kafka/KafkaProducerTestBase.java | 253 ++++++++++++++++++-
.../connectors/kafka/KafkaTestEnvironment.java | 16 ++
10 files changed, 420 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 711fe07..7909ba6 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -24,6 +24,9 @@ import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -66,7 +69,7 @@ import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase
*
* All methods and constructors in this class are marked with the approach they are needed for.
*/
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction, CheckpointedFunction {
/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
@@ -418,6 +421,18 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
invokeInternal(element.getValue(), element.getTimestamp());
}
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ internalProducer.initializeState(context);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ internalProducer.snapshotState(context);
+ }
+
/**
* Configuration object returned by the writeToKafkaWithTimestamps() call.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index 42b9682..f811893 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -18,16 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-
+/**
+ * IT cases for the {@link FlinkKafkaProducer010}.
+ */
@SuppressWarnings("serial")
public class Kafka010ProducerITCase extends KafkaProducerTestBase {
-
- @Test
- public void testCustomPartitioning() {
- runCustomPartitioningTest();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c88c858..3094172 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -25,8 +25,10 @@ import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
@@ -35,6 +37,8 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSeri
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -46,7 +50,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -116,6 +123,31 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+ List<ConsumerRecord<K, V>> result = new ArrayList<>();
+ KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+ consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+ while (true) {
+ boolean processedAtLeastOneRecord = false;
+
+ // wait for new records with timeout and break the loop if we didn't get any
+ Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+ while (iterator.hasNext()) {
+ ConsumerRecord<K, V> record = iterator.next();
+ result.add(record);
+ processedAtLeastOneRecord = true;
+ }
+
+ if (!processedAtLeastOneRecord) {
+ break;
+ }
+ }
+
+ return UnmodifiableList.decorate(result);
+ }
+
+ @Override
public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
@@ -131,6 +163,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+ return FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props);
+ }
+
+ @Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index 5c951db..6710a77 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -18,15 +18,16 @@
package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
@SuppressWarnings("serial")
public class Kafka08ProducerITCase extends KafkaProducerTestBase {
- @Test
- public void testCustomPartitioning() {
- runCustomPartitioningTest();
+ @Override
+ public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+ // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
}
+ @Override
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ // Disable this test since FlinkKafka08Producer doesn't support custom operator mode
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 2419b53..c0e6939 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -25,12 +25,14 @@ import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
+
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
@@ -41,6 +43,12 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSeri
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;
@@ -50,8 +58,10 @@ import java.io.IOException;
import java.net.BindException;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -105,6 +115,27 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+ List<ConsumerRecord<K, V>> result = new ArrayList<>();
+ KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+ consumer.subscribe(new TopicPartition(topic, partition));
+
+ while (true) {
+ Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
+ if (topics == null || !topics.containsKey(topic)) {
+ break;
+ }
+ List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
+ result.addAll(records);
+ if (records.size() == 0) {
+ break;
+ }
+ }
+
+ return UnmodifiableList.decorate(result);
+ }
+
+ @Override
public <T> StreamSink<T> getProducerSink(
String topic,
KeyedSerializationSchema<T> serSchema,
@@ -127,6 +158,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index ae4f5b2..847f818 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -18,15 +18,13 @@
package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
+/**
+ * IT cases for the {@link FlinkKafkaProducer09}.
+ */
@SuppressWarnings("serial")
public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
- @Test
- public void testCustomPartitioning() {
- runCustomPartitioningTest();
+ @Override
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ // Disable this test since FlinkKafka09Producer doesn't support custom operator mode
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 84fdbf8..3149308 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -27,8 +27,10 @@ import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
@@ -37,6 +39,8 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSeri
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -48,7 +52,10 @@ import scala.collection.Seq;
import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -102,6 +109,30 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+ List<ConsumerRecord<K, V>> result = new ArrayList<>();
+ KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+ consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+ while (true) {
+ boolean processedAtLeastOneRecord = false;
+
+ Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+ while (iterator.hasNext()) {
+ ConsumerRecord<K, V> record = iterator.next();
+ result.add(record);
+ processedAtLeastOneRecord = true;
+ }
+
+ if (!processedAtLeastOneRecord) {
+ break;
+ }
+ }
+
+ return UnmodifiableList.decorate(result);
+ }
+
+ @Override
public <T> StreamSink<T> getProducerSink(
String topic,
KeyedSerializationSchema<T> serSchema,
@@ -120,6 +151,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 285bcf9..1de8e6f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -788,7 +788,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
- topic, parallelism, numElementsPerPartition, true);
+ topic,
+ parallelism,
+ numElementsPerPartition,
+ true);
// run the topology that fails and recovers
@@ -837,7 +840,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
- topic, numPartitions, numElementsPerPartition, false);
+ topic,
+ numPartitions,
+ numElementsPerPartition,
+ true);
// run the topology that fails and recovers
@@ -885,7 +891,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
- topic, numPartitions, numElementsPerPartition, true);
+ topic,
+ numPartitions,
+ numElementsPerPartition,
+ true);
// run the topology that fails and recovers
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index bcc8328..dc76249 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -18,26 +18,47 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Test;
+
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
@@ -74,7 +95,8 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
* Each topic also has a final sink that validates that there are no duplicates and that all
* partitions are present.
*/
- public void runCustomPartitioningTest() {
+ @Test
+ public void testCustomPartitioning() {
try {
LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
@@ -130,7 +152,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
props.putAll(secureProps);
// sink partitions into
- kafkaServer.produceIntoKafka(stream, defaultTopic,
+ kafkaServer.produceIntoKafka(
+ stream,
+ defaultTopic,
// this serialization schema will route between the default topic and dynamic topic
new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
props,
@@ -169,6 +193,145 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
}
}
+ /**
+ * Tests the at-least-once semantic for the simple writes into Kafka.
+ */
+ @Test
+ public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+ testOneToOneAtLeastOnce(true);
+ }
+
+ /**
+ * Tests the at-least-once semantic for the simple writes into Kafka.
+ */
+ @Test
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ testOneToOneAtLeastOnce(false);
+ }
+
+ /**
+ * This test sets KafkaProducer so that it will not automatically flush the data and
+ * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
+ */
+ protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
+ final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
+ final int partition = 0;
+ final int numElements = 1000;
+ final int failAfterElements = 333;
+
+ createTestTopic(topic, 1, 1);
+
+ TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+ KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(500);
+ env.setParallelism(1);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ Properties properties = new Properties();
+ properties.putAll(standardProps);
+ properties.putAll(secureProps);
+ // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
+ properties.setProperty("timeout.ms", "10000");
+ properties.setProperty("max.block.ms", "10000");
+ // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
+ properties.setProperty("batch.size", "10240000");
+ properties.setProperty("linger.ms", "10000");
+
+ int leaderId = kafkaServer.getLeaderToShutDown(topic);
+ BrokerRestartingMapper.resetState();
+
+ // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
+ DataStream<Integer> inputStream = env
+ .fromCollection(getIntegersSequence(numElements))
+ .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
+
+ StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return partition;
+ }
+ });
+
+ if (regularSink) {
+ inputStream.addSink(kafkaSink.getUserFunction());
+ }
+ else {
+ kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return partition;
+ }
+ });
+ }
+
+ FailingIdentityMapper.failedBefore = false;
+ try {
+ env.execute("One-to-one at least once test");
+ fail("Job should fail!");
+ }
+ catch (JobExecutionException ex) {
+ assertEquals("Broker was shutdown!", ex.getCause().getMessage());
+ }
+
+ kafkaServer.restartBroker(leaderId);
+
+ // assert that before failure we successfully snapshot/flushed all expected elements
+ assertAtLeastOnceForTopic(
+ properties,
+ topic,
+ partition,
+ ImmutableSet.copyOf(getIntegersSequence(BrokerRestartingMapper.numElementsBeforeSnapshot)),
+ 30000L);
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
+ * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+ */
+ private void assertAtLeastOnceForTopic(
+ Properties properties,
+ String topic,
+ int partition,
+ Set<Integer> expectedElements,
+ long timeoutMillis) throws Exception {
+
+ long startMillis = System.currentTimeMillis();
+ Set<Integer> actualElements = new HashSet<>();
+
+ // until we timeout...
+ while (System.currentTimeMillis() < startMillis + timeoutMillis) {
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+
+ // query kafka for new records ...
+ Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
+
+ for (ConsumerRecord<Integer, Integer> record : records) {
+ actualElements.add(record.value());
+ }
+
+ // succeed if we got all expectedElements
+ if (actualElements.containsAll(expectedElements)) {
+ return;
+ }
+ }
+
+ fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
+ }
+
+ private List<Integer> getIntegersSequence(int size) {
+ List<Integer> result = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ result.add(i);
+ }
+ return result;
+ }
+
// ------------------------------------------------------------------------
public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
@@ -263,4 +426,90 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
}
}
}
+
+ private static class BrokerRestartingMapper<T> extends RichMapFunction<T, T>
+ implements CheckpointedFunction, CheckpointListener {
+
+ private static final long serialVersionUID = 6334389850158707313L;
+
+ public static volatile boolean restartedLeaderBefore;
+ public static volatile boolean hasBeenCheckpointedBeforeFailure;
+ public static volatile int numElementsBeforeSnapshot;
+
+ private final int shutdownBrokerId;
+ private final int failCount;
+ private int numElementsTotal;
+
+ private boolean failer;
+ private boolean hasBeenCheckpointed;
+
+ public static void resetState() {
+ restartedLeaderBefore = false;
+ hasBeenCheckpointedBeforeFailure = false;
+ numElementsBeforeSnapshot = 0;
+ }
+
+ public BrokerRestartingMapper(int shutdownBrokerId, int failCount) {
+ this.shutdownBrokerId = shutdownBrokerId;
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+
+ if (!restartedLeaderBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ // shut down a Kafka broker
+ KafkaServer toShutDown = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+
+ if (kafkaServer.getBrokerId(server) == shutdownBrokerId) {
+ toShutDown = server;
+ break;
+ }
+ }
+
+ if (toShutDown == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
+ + " ; available brokers: " + listOfBrokers.toString());
+ } else {
+ hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+ restartedLeaderBefore = true;
+ toShutDown.shutdown();
+ toShutDown.awaitShutdown();
+ throw new Exception("Broker was shutdown!");
+ }
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ hasBeenCheckpointed = true;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ numElementsBeforeSnapshot = numElementsTotal;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6630dfdd/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 311a1a4..7b2a42d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -31,6 +31,10 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.Collection;
+
/**
* Abstract class providing a Kafka test environment
*/
@@ -79,6 +83,12 @@ public abstract class KafkaTestEnvironment {
public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
+ public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
+ Properties properties,
+ String topic,
+ int partition,
+ long timeout);
+
public abstract <T> StreamSink<T> getProducerSink(String topic,
KeyedSerializationSchema<T> serSchema, Properties props,
FlinkKafkaPartitioner<T> partitioner);
@@ -87,6 +97,12 @@ public abstract class KafkaTestEnvironment {
KeyedSerializationSchema<T> serSchema, Properties props,
FlinkKafkaPartitioner<T> partitioner);
+ public abstract <T> DataStreamSink<T> writeToKafkaWithTimestamps(
+ DataStream<T> stream,
+ String topic,
+ KeyedSerializationSchema<T> serSchema,
+ Properties props);
+
// -- offset handlers
public interface KafkaOffsetHandler {