You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:22 UTC
[40/60] git commit: [scala] Reactivate DeltaIterationSanityCheckTest
[scala] Reactivate DeltaIterationSanityCheckTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fd280981
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fd280981
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fd280981
Branch: refs/heads/master
Commit: fd2809813a0b98e283d0c3e566fcc56425566ca2
Parents: 31ed0c4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Sep 16 12:16:18 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:59 2014 +0200
----------------------------------------------------------------------
.../scala/DeltaIterationSanityCheckTest.scala | 361 +++++++++----------
1 file changed, 169 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd280981/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
index d5b0d24..094c1b4 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -1,192 +1,169 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.api.scala
-//
-//import org.junit.Test
-//import org.apache.flink.api.common.InvalidProgramException
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//import org.scalatest.junit.AssertionsForJUnit
-//
-//// Verify that the sanity checking in delta iterations works. We just
-//// have a dummy job that is not meant to be executed. Only verify that
-//// the join/coGroup inside the iteration is checked.
-//class DeltaIterationSanityCheckTest extends Serializable {
-//
-// @Test
-// def testCorrectJoinWithSolution1 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test
-// def testCorrectJoinWithSolution2 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test(expected = classOf[InvalidProgramException])
-// def testIncorrectJoinWithSolution1 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test(expected = classOf[InvalidProgramException])
-// def testIncorrectJoinWithSolution2 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test(expected = classOf[InvalidProgramException])
-// def testIncorrectJoinWithSolution3 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test
-// def testCorrectCoGroupWithSolution1 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test
-// def testCorrectCoGroupWithSolution2 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test(expected = classOf[InvalidProgramException])
-// def testIncorrectCoGroupWithSolution1 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test(expected = classOf[InvalidProgramException])
-// def testIncorrectCoGroupWithSolution2 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//
-// @Test(expected = classOf[InvalidProgramException])
-// def testIncorrectCoGroupWithSolution3 {
-// val solutionInput = CollectionDataSource(Array((1, "1")))
-// val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-// val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-// (result, ws)
-// }
-// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
-//
-// val output = iteration.write("/dummy", CsvOutputFormat())
-//
-// val plan = new ScalaPlan(Seq(output))
-// }
-//}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.junit.Test
+import org.apache.flink.api.common.InvalidProgramException
+
+import org.apache.flink.api.scala._
+import org.scalatest.junit.AssertionsForJUnit
+
+// Verify that the sanity checking in delta iterations works. We just
+// have a dummy job that is not meant to be executed. Only verify that
+// the join/coGroup inside the iteration is checked.
+class DeltaIterationSanityCheckTest extends Serializable {
+
+ @Test
+ def testCorrectJoinWithSolution1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+ val result = s.join(ws).where("_1").equalTo("_1") { (l, r) => l }
+ (result, ws)
+ }
+
+ val output = iteration.print()
+ }
+
+ @Test
+ def testCorrectJoinWithSolution2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+ val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
+ (result, ws)
+ }
+
+ val output = iteration.print()
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testIncorrectJoinWithSolution1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+ val result = s.join(ws).where("_2").equalTo("_2") { (l, r) => l }
+ (result, ws)
+ }
+
+ val output = iteration.print()
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testIncorrectJoinWithSolution2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+ val result = ws.join(s).where("_2").equalTo("_2") { (l, r) => l }
+ (result, ws)
+ }
+
+ val output = iteration.print() }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testIncorrectJoinWithSolution3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
+ val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
+ (result, ws)
+ }
+
+ val output = iteration.print()
+ }
+
+ @Test
+ def testCorrectCoGroupWithSolution1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+ val result = s.coGroup(ws).where("_1").equalTo("_1") { (l, r) => l.min }
+ (result, ws)
+ }
+
+ val output = iteration.print()
+ }
+
+ @Test
+ def testCorrectCoGroupWithSolution2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+ val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
+ (result, ws)
+ }
+
+ val output = iteration.print()
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testIncorrectCoGroupWithSolution1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+ val result = s.coGroup(ws).where("_2").equalTo("_2") { (l, r) => l.min }
+ (result, ws)
+ }
+
+ val output = iteration.print()
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testIncorrectCoGroupWithSolution2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+ val result = ws.coGroup(s).where("_2").equalTo("_2") { (l, r) => l.min }
+ (result, ws)
+ }
+
+ val output = iteration.print() }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testIncorrectCoGroupWithSolution3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val solutionInput = env.fromElements((1, "1"))
+ val worksetInput = env.fromElements((2, "2"))
+
+ val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
+ val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
+ (result, ws)
+ }
+
+ val output = iteration.print()
+ }
+}