You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:22 UTC

[40/60] git commit: [scala] Reactivate DeltaIterationSanityCheckTest

[scala] Reactivate DeltaIterationSanityCheckTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fd280981
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fd280981
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fd280981

Branch: refs/heads/master
Commit: fd2809813a0b98e283d0c3e566fcc56425566ca2
Parents: 31ed0c4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Sep 16 12:16:18 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:59 2014 +0200

----------------------------------------------------------------------
 .../scala/DeltaIterationSanityCheckTest.scala   | 361 +++++++++----------
 1 file changed, 169 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fd280981/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
index d5b0d24..094c1b4 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -1,192 +1,169 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.api.scala
-//
-//import org.junit.Test
-//import org.apache.flink.api.common.InvalidProgramException
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//import org.scalatest.junit.AssertionsForJUnit
-//
-//// Verify that the sanity checking in delta iterations works. We just
-//// have a dummy job that is not meant to be executed. Only verify that
-//// the join/coGroup inside the iteration is checked.
-//class DeltaIterationSanityCheckTest extends Serializable {
-//
-//  @Test
-//  def testCorrectJoinWithSolution1 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//
-//  @Test
-//  def testCorrectJoinWithSolution2 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//
-//  @Test(expected = classOf[InvalidProgramException])
-//  def testIncorrectJoinWithSolution1 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//
-//  @Test(expected = classOf[InvalidProgramException])
-//  def testIncorrectJoinWithSolution2 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//
-//  @Test(expected = classOf[InvalidProgramException])
-//  def testIncorrectJoinWithSolution3 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//   }
-//
-//  @Test
-//  def testCorrectCoGroupWithSolution1 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//
-//  @Test
-//  def testCorrectCoGroupWithSolution2 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//
-//  @Test(expected = classOf[InvalidProgramException])
-//  def testIncorrectCoGroupWithSolution1 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//
-//  @Test(expected = classOf[InvalidProgramException])
-//  def testIncorrectCoGroupWithSolution2 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//
-//  @Test(expected = classOf[InvalidProgramException])
-//  def testIncorrectCoGroupWithSolution3 {
-//    val solutionInput = CollectionDataSource(Array((1, "1")))
-//    val worksetInput = CollectionDataSource(Array((2, "2")))
-//
-//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
-//      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-//      (result, ws)
-//    }
-//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
-//
-//    val output = iteration.write("/dummy", CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output))
-//  }
-//}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.junit.Test
+import org.apache.flink.api.common.InvalidProgramException
+
+import org.apache.flink.api.scala._
+import org.scalatest.junit.AssertionsForJUnit
+
+// Verify that the sanity checking in delta iterations works. We just
+// have a dummy job that is not meant to be executed. Only verify that
+// the join/coGroup inside the iteration is checked.
+class DeltaIterationSanityCheckTest extends Serializable {
+
+  @Test
+  def testCorrectJoinWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.join(ws).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    val output = iteration.print()
+  }
+
+  @Test
+  def testCorrectJoinWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    val output = iteration.print()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.join(ws).where("_2").equalTo("_2") { (l, r) => l }
+      (result, ws)
+    }
+
+    val output = iteration.print()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.join(s).where("_2").equalTo("_2") { (l, r) => l }
+      (result, ws)
+    }
+
+    val output = iteration.print()  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
+      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    val output = iteration.print()
+   }
+
+  @Test
+  def testCorrectCoGroupWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.coGroup(ws).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    val output = iteration.print()
+  }
+
+  @Test
+  def testCorrectCoGroupWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    val output = iteration.print()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.coGroup(ws).where("_2").equalTo("_2") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    val output = iteration.print()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_2").equalTo("_2") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    val output = iteration.print()  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    val output = iteration.print()
+  }
+}