You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/06 04:26:01 UTC
[3/6] flink git commit: [FLINK-7038] Correct misused terms
(WindowedDataStream, JoinedDataStream)
[FLINK-7038] Correct misused terms (WindowedDataStream, JoinedDataStream)
This closes #4229.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c9a0b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c9a0b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c9a0b96
Branch: refs/heads/release-1.3
Commit: 0c9a0b96212d7d1549e4d66b23ddbfd982b19b1d
Parents: 7ca13f2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Jul 4 19:12:48 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jul 5 18:04:51 2017 +0800
----------------------------------------------------------------------
docs/dev/scala_api_extensions.md | 8 +-
.../StreamingScalaAPICompletenessTest.scala | 4 +-
.../OnJoinedDataStreamTest.scala | 67 --------------
.../OnJoinedStreamTest.scala | 67 ++++++++++++++
.../OnWindowedDataStreamTest.scala | 97 --------------------
.../OnWindowedStreamTest.scala | 97 ++++++++++++++++++++
6 files changed, 170 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/docs/dev/scala_api_extensions.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_api_extensions.md b/docs/dev/scala_api_extensions.md
index 53c0c98..283f50b 100644
--- a/docs/dev/scala_api_extensions.md
+++ b/docs/dev/scala_api_extensions.md
@@ -316,7 +316,7 @@ data.keyingBy(
</tr>
<tr>
<td><strong>reduceWith</strong></td>
- <td><strong>reduce (KeyedStream, WindowedDataStream)</strong></td>
+ <td><strong>reduce (KeyedStream, WindowedStream)</strong></td>
<td>
{% highlight scala %}
data.reduceWith {
@@ -327,7 +327,7 @@ data.reduceWith {
</tr>
<tr>
<td><strong>foldWith</strong></td>
- <td><strong>fold (KeyedStream, WindowedDataStream)</strong></td>
+ <td><strong>fold (KeyedStream, WindowedStream)</strong></td>
<td>
{% highlight scala %}
data.foldWith(User(bought = 0)) {
@@ -338,7 +338,7 @@ data.foldWith(User(bought = 0)) {
</tr>
<tr>
<td><strong>applyWith</strong></td>
- <td><strong>apply (WindowedDataStream)</strong></td>
+ <td><strong>apply (WindowedStream)</strong></td>
<td>
{% highlight scala %}
data.applyWith(0)(
@@ -350,7 +350,7 @@ data.applyWith(0)(
</tr>
<tr>
<td><strong>projecting</strong></td>
- <td><strong>apply (JoinedDataStream)</strong></td>
+ <td><strong>apply (JoinedStream)</strong></td>
<td>
{% highlight scala %}
data1.join(data2).
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 7cf6935..d8737e1 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -48,8 +48,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
"org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
- "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
- "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
+ "org.apache.flink.streaming.api.datastream.WindowedStream.getType",
+ "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionConfig",
"org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
"org.apache.flink.streaming.api.datastream.WindowedStream.getInputType",
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
deleted file mode 100644
index 34c55d7..0000000
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
-import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
-import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.junit.Test
-
-class OnJoinedDataStreamTest extends AcceptPFTestBase {
-
- @Test
- def testProjectingOnTuple(): Unit = {
- val test =
- tuples.join(tuples).
- where {
- case (id, _) => id
- }.equalTo {
- case (id, _) => id
- }.window {
- TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
- }.projecting {
- case ((_, v1), (_, v2)) => s"$v1 $v2"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "projecting should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testProjectingOnCaseClass(): Unit = {
- val test =
- caseObjects.join(caseObjects).
- where {
- case KeyValuePair(id, _) => id
- }.equalTo {
- case KeyValuePair(id, _) => id
- }.window {
- TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
- }.projecting {
- case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "projecting should produce a SingleOutputStreamOperator")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
new file mode 100644
index 0000000..2069997
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStreamTest.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.Test
+
+class OnJoinedStreamTest extends AcceptPFTestBase {
+
+ @Test
+ def testProjectingOnTuple(): Unit = {
+ val test =
+ tuples.join(tuples).
+ where {
+ case (id, _) => id
+ }.equalTo {
+ case (id, _) => id
+ }.window {
+ TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
+ }.projecting {
+ case ((_, v1), (_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "projecting should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testProjectingOnCaseClass(): Unit = {
+ val test =
+ caseObjects.join(caseObjects).
+ where {
+ case KeyValuePair(id, _) => id
+ }.equalTo {
+ case KeyValuePair(id, _) => id
+ }.window {
+ TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))
+ }.projecting {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "projecting should produce a SingleOutputStreamOperator")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
deleted file mode 100644
index 4fa9f5a..0000000
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
-import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
-import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
-import org.junit.Test
-
-class OnWindowedDataStreamTest extends AcceptPFTestBase {
-
- @Test
- def testReduceWithOnTuple(): Unit = {
- val test =
- windowedTuples.reduceWith {
- case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "reduceWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testReduceWithOnCaseClass(): Unit = {
- val test =
- windowedCaseObjects.reduceWith {
- case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "reduceWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testFoldWithOnTuple(): Unit = {
- val test =
- windowedTuples.foldWith("") {
- case (folding, (_, value)) => s"$folding $value"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "foldWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testFoldWithOnCaseClass(): Unit = {
- val test =
- windowedCaseObjects.foldWith("") {
- case (folding, KeyValuePair(_, value)) => s"$folding $value"
- }
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "foldWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testApplyWithOnTuple(): Unit = {
- val test =
- windowedTuples.applyWith("")(
- foldFunction = {
- case (folding, (_, value)) => s"$folding $value"
- },
- windowFunction = {
- case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
- })
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "applyWith should produce a SingleOutputStreamOperator")
- }
-
- @Test
- def testApplyWithOnCaseClass(): Unit = {
- val test =
- windowedCaseObjects.applyWith("")(
- foldFunction = {
- case (folding, KeyValuePair(_, value)) => s"$folding $value"
- },
- windowFunction = {
- case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
- })
- assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
- "applyWith should produce a SingleOutputStreamOperator")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9a0b96/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
new file mode 100644
index 0000000..923b869
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStreamTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase
+import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair
+import org.junit.Test
+
+class OnWindowedStreamTest extends AcceptPFTestBase {
+
+ @Test
+ def testReduceWithOnTuple(): Unit = {
+ val test =
+ windowedTuples.reduceWith {
+ case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "reduceWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testReduceWithOnCaseClass(): Unit = {
+ val test =
+ windowedCaseObjects.reduceWith {
+ case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2")
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "reduceWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testFoldWithOnTuple(): Unit = {
+ val test =
+ windowedTuples.foldWith("") {
+ case (folding, (_, value)) => s"$folding $value"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "foldWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testFoldWithOnCaseClass(): Unit = {
+ val test =
+ windowedCaseObjects.foldWith("") {
+ case (folding, KeyValuePair(_, value)) => s"$folding $value"
+ }
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "foldWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testApplyWithOnTuple(): Unit = {
+ val test =
+ windowedTuples.applyWith("")(
+ foldFunction = {
+ case (folding, (_, value)) => s"$folding $value"
+ },
+ windowFunction = {
+ case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
+ })
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "applyWith should produce a SingleOutputStreamOperator")
+ }
+
+ @Test
+ def testApplyWithOnCaseClass(): Unit = {
+ val test =
+ windowedCaseObjects.applyWith("")(
+ foldFunction = {
+ case (folding, KeyValuePair(_, value)) => s"$folding $value"
+ },
+ windowFunction = {
+ case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck)
+ })
+ assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]],
+ "applyWith should produce a SingleOutputStreamOperator")
+ }
+
+}