You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:04 UTC

[68/82] [abbrv] incubator-flink git commit: Change integration tests to reuse cluster in order to save startup and shutdown time.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 64f04cb..78426ef 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -21,222 +21,212 @@ import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.apache.flink.util.Collector
-import org.junit.Assert
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit._
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
 
-object MapProgs {
-  var NUM_PROGRAMS: Int = 9
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Test identity map with basic type
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getStringDataSet(env)
-        val identityMapDs = ds.map( t => t)
-        identityMapDs.writeAsText(resultPath)
-        env.execute()
-        "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
-          ".\n" + "Luke Skywalker\n" + "Random comment\n" + "LOL\n"
-      
-      case 2 =>
-        /*
-         * Test identity map with a tuple
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val identityMapDs = ds.map( t => t )
-        identityMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-          "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-          "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-          "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-          "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-          "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-      
-      case 3 =>
-        /*
-         * Test type conversion mapper (Custom -> Tuple)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val typeConversionMapDs = ds.map( c => (c.myInt, c.myLong, c.myString) )
-        typeConversionMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, " +
-          "how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + "4,6," +
-          "Comment#1\n" + "4,7,Comment#2\n" + "4,8,Comment#3\n" + "4,9,Comment#4\n" + "5,10," +
-          "Comment#5\n" + "5,11,Comment#6\n" + "5,12,Comment#7\n" + "5,13,Comment#8\n" + "5,14," +
-          "Comment#9\n" + "6,15,Comment#10\n" + "6,16,Comment#11\n" + "6,17,Comment#12\n" + "6," +
-          "18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"
-      
-      case 4 =>
-        /*
-         * Test type conversion mapper (Tuple -> Basic)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val typeConversionMapDs = ds.map(_._3)
-        typeConversionMapDs.writeAsText(resultPath)
-        env.execute()
-        "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
-          ".\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + "Comment#3\n" +
-          "Comment#4\n" + "Comment#5\n" + "Comment#6\n" + "Comment#7\n" + "Comment#8\n" +
-          "Comment#9\n" + "Comment#10\n" + "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
-          "Comment#14\n" + "Comment#15\n"
-      
-      case 5 =>
-        /*
-         * Test mapper on tuple - Increment Integer field, reorder second and third fields
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val tupleMapDs = ds.map( t => (t._1 + 1, t._3, t._2) )
-        tupleMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "2,Hi,1\n" + "3,Hello,2\n" + "4,Hello world,2\n" + "5,Hello world, how are you?," +
-          "3\n" + "6,I am fine.,3\n" + "7,Luke Skywalker,3\n" + "8,Comment#1,4\n" + "9,Comment#2," +
-          "4\n" + "10,Comment#3,4\n" + "11,Comment#4,4\n" + "12,Comment#5,5\n" + "13,Comment#6," +
-          "5\n" + "14,Comment#7,5\n" + "15,Comment#8,5\n" + "16,Comment#9,5\n" + "17,Comment#10," +
-          "6\n" + "18,Comment#11,6\n" + "19,Comment#12,6\n" + "20,Comment#13,6\n" + "21," +
-          "Comment#14,6\n" + "22,Comment#15,6\n"
-      
-      case 6 =>
-        /*
-         * Test mapper on Custom - lowercase myString
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val customMapDs = ds.map { c => c.myString = c.myString.toLowerCase; c }
-        customMapDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,hi\n" + "2,1,hello\n" + "2,2,hello world\n" + "3,3,hello world, " +
-          "how are you?\n" + "3,4,i am fine.\n" + "3,5,luke skywalker\n" + "4,6," +
-          "comment#1\n" + "4,7,comment#2\n" + "4,8,comment#3\n" + "4,9,comment#4\n" + "5,10," +
-          "comment#5\n" + "5,11,comment#6\n" + "5,12,comment#7\n" + "5,13,comment#8\n" + "5,14," +
-          "comment#9\n" + "6,15,comment#10\n" + "6,16,comment#11\n" + "6,17,comment#12\n" + "6," +
-          "18,comment#13\n" + "6,19,comment#14\n" + "6,20,comment#15\n"
-      
-      case 7 =>
-        /*
-         * Test mapper if UDF returns input object - increment first field of a tuple
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env).map {
-          t => MutableTuple3(t._1, t._2, t._3)
-        }
-        val inputObjMapDs = ds.map { t => t._1 = t._1 + 1; t }
-        inputObjMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n" + "5,3,Hello world, " +
-          "how are you?\n" + "6,3,I am fine.\n" + "7,3,Luke Skywalker\n" + "8,4," +
-          "Comment#1\n" + "9,4,Comment#2\n" + "10,4,Comment#3\n" + "11,4,Comment#4\n" + "12,5," +
-          "Comment#5\n" + "13,5,Comment#6\n" + "14,5,Comment#7\n" + "15,5,Comment#8\n" + "16,5," +
-          "Comment#9\n" + "17,6,Comment#10\n" + "18,6,Comment#11\n" + "19,6,Comment#12\n" + "20," +
-          "6,Comment#13\n" + "21,6,Comment#14\n" + "22,6,Comment#15\n"
-      
-      case 8 =>
-        /*
-         * Test map with broadcast set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ints = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val bcMapDs = ds.map(
-          new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
-            var f2Replace = 0
-            override def open(config: Configuration): Unit = {
-              val ints = getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-              f2Replace = ints.sum
-            }
-            override def map(in: (Int, Long, String)): (Int, Long, String) = {
-              in.copy(_1 = f2Replace)
-            }
-          }).withBroadcastSet(ints, "ints")
-        bcMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, " +
-          "how are you?\n" + "55,3,I am fine.\n" + "55,3,Luke Skywalker\n" + "55,4," +
-          "Comment#1\n" + "55,4,Comment#2\n" + "55,4,Comment#3\n" + "55,4,Comment#4\n" + "55,5," +
-          "Comment#5\n" + "55,5,Comment#6\n" + "55,5,Comment#7\n" + "55,5,Comment#8\n" + "55,5," +
-          "Comment#9\n" + "55,6,Comment#10\n" + "55,6,Comment#11\n" + "55,6,Comment#12\n" + "55," +
-          "6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"
-      
-      case 9 =>
-        /*
-         * Test passing configuration object.
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getSmall3TupleDataSet(env)
-        val conf = new Configuration
-        val testKey = "testVariable"
-        val testValue = 666
-        conf.setInteger(testKey, testValue)
-        val bcMapDs = ds.map(
-          new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
-            override def open(config: Configuration): Unit = {
-              val fromConfig = config.getInteger(testKey, -1)
-              Assert.assertEquals(testValue, fromConfig)
-            }
-            override def map(in: (Int, Long, String)): (Int, Long, String) = {
-              in
-            }
-          }).withParameters(conf)
-        bcMapDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world"
-      
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
   }
-}
 
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
 
-@RunWith(classOf[Parameterized])
-class MapITCase(config: Configuration) extends JavaProgramTestBase(config) {
+  @Test
+  def testIdentityMapperWithBasicType: Unit = {
+    /*
+     * Test identity map with basic type
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+    val identityMapDs = ds.map( t => t)
+    identityMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
+      ".\n" + "Luke Skywalker\n" + "Random comment\n" + "LOL\n"
+  }
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+  @Test
+  def testIdentityMapperWithTuple: Unit = {
+    /*
+     * Test identity map with a tuple
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val identityMapDs = ds.map( t => t )
+    identityMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+  @Test
+  def testTypeConversionMapperCustomToTuple: Unit = {
+    /*
+     * Test type conversion mapper (Custom -> Tuple)
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val typeConversionMapDs = ds.map( c => (c.myInt, c.myLong, c.myString) )
+    typeConversionMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, " +
+      "how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + "4,6," +
+      "Comment#1\n" + "4,7,Comment#2\n" + "4,8,Comment#3\n" + "4,9,Comment#4\n" + "5,10," +
+      "Comment#5\n" + "5,11,Comment#6\n" + "5,12,Comment#7\n" + "5,13,Comment#8\n" + "5,14," +
+      "Comment#9\n" + "6,15,Comment#10\n" + "6,16,Comment#11\n" + "6,17,Comment#12\n" + "6," +
+      "18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = MapProgs.runProgram(curProgId, resultPath)
+  @Test
+  def testTypeConversionMapperTupleToBasic: Unit = {
+    /*
+     * Test type conversion mapper (Tuple -> Basic)
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val typeConversionMapDs = ds.map(_._3)
+    typeConversionMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
+      ".\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + "Comment#3\n" +
+      "Comment#4\n" + "Comment#5\n" + "Comment#6\n" + "Comment#7\n" + "Comment#8\n" +
+      "Comment#9\n" + "Comment#10\n" + "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
+      "Comment#14\n" + "Comment#15\n"
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testMapperOnTupleIncrementFieldReorderSecondAndThirdFields: Unit = {
+    /*
+     * Test mapper on tuple - Increment Integer field, reorder second and third fields
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val tupleMapDs = ds.map( t => (t._1 + 1, t._3, t._2) )
+    tupleMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,Hi,1\n" + "3,Hello,2\n" + "4,Hello world,2\n" + "5,Hello world, how are you?," +
+      "3\n" + "6,I am fine.,3\n" + "7,Luke Skywalker,3\n" + "8,Comment#1,4\n" + "9,Comment#2," +
+      "4\n" + "10,Comment#3,4\n" + "11,Comment#4,4\n" + "12,Comment#5,5\n" + "13,Comment#6," +
+      "5\n" + "14,Comment#7,5\n" + "15,Comment#8,5\n" + "16,Comment#9,5\n" + "17,Comment#10," +
+      "6\n" + "18,Comment#11,6\n" + "19,Comment#12,6\n" + "20,Comment#13,6\n" + "21," +
+      "Comment#14,6\n" + "22,Comment#15,6\n"
+  }
+
+  @Test
+  def testMapperOnCustomLowercaseString: Unit = {
+    /*
+     * Test mapper on Custom - lowercase myString
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val customMapDs = ds.map { c => c.myString = c.myString.toLowerCase; c }
+    customMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,0,hi\n" + "2,1,hello\n" + "2,2,hello world\n" + "3,3,hello world, " +
+      "how are you?\n" + "3,4,i am fine.\n" + "3,5,luke skywalker\n" + "4,6," +
+      "comment#1\n" + "4,7,comment#2\n" + "4,8,comment#3\n" + "4,9,comment#4\n" + "5,10," +
+      "comment#5\n" + "5,11,comment#6\n" + "5,12,comment#7\n" + "5,13,comment#8\n" + "5,14," +
+      "comment#9\n" + "6,15,comment#10\n" + "6,16,comment#11\n" + "6,17,comment#12\n" + "6," +
+      "18,comment#13\n" + "6,19,comment#14\n" + "6,20,comment#15\n"
   }
-}
 
-object MapITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to MapProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
+  @Test
+  def testMapperIfUDFReturnsInputObjectIncrementFirstFieldOfTuple: Unit = {
+    /*
+     * Test mapper if UDF returns input object - increment first field of a tuple
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).map {
+      t => MutableTuple3(t._1, t._2, t._3)
     }
+    val inputObjMapDs = ds.map { t => t._1 = t._1 + 1; t }
+    inputObjMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n" + "5,3,Hello world, " +
+      "how are you?\n" + "6,3,I am fine.\n" + "7,3,Luke Skywalker\n" + "8,4," +
+      "Comment#1\n" + "9,4,Comment#2\n" + "10,4,Comment#3\n" + "11,4,Comment#4\n" + "12,5," +
+      "Comment#5\n" + "13,5,Comment#6\n" + "14,5,Comment#7\n" + "15,5,Comment#8\n" + "16,5," +
+      "Comment#9\n" + "17,6,Comment#10\n" + "18,6,Comment#11\n" + "19,6,Comment#12\n" + "20," +
+      "6,Comment#13\n" + "21,6,Comment#14\n" + "22,6,Comment#15\n"
+  }
 
-    configs.asJavaCollection
+  @Test
+  def testMapWithBroadcastSet: Unit = {
+    /*
+     * Test map with broadcast set
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ints = CollectionDataSets.getIntDataSet(env)
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val bcMapDs = ds.map(
+      new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
+        var f2Replace = 0
+        override def open(config: Configuration): Unit = {
+          val ints = getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+          f2Replace = ints.sum
+        }
+        override def map(in: (Int, Long, String)): (Int, Long, String) = {
+          in.copy(_1 = f2Replace)
+        }
+      }).withBroadcastSet(ints, "ints")
+    bcMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, " +
+      "how are you?\n" + "55,3,I am fine.\n" + "55,3,Luke Skywalker\n" + "55,4," +
+      "Comment#1\n" + "55,4,Comment#2\n" + "55,4,Comment#3\n" + "55,4,Comment#4\n" + "55,5," +
+      "Comment#5\n" + "55,5,Comment#6\n" + "55,5,Comment#7\n" + "55,5,Comment#8\n" + "55,5," +
+      "Comment#9\n" + "55,6,Comment#10\n" + "55,6,Comment#11\n" + "55,6,Comment#12\n" + "55," +
+      "6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"
   }
-}
 
+  @Test
+  def testPassingConfigurationObject: Unit = {
+    /*
+     * Test passing configuration object.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+    val conf = new Configuration
+    val testKey = "testVariable"
+    val testValue = 666
+    conf.setInteger(testKey, testValue)
+    val bcMapDs = ds.map(
+      new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
+        override def open(config: Configuration): Unit = {
+          val fromConfig = config.getInteger(testKey, -1)
+          Assert.assertEquals(testValue, fromConfig)
+        }
+        override def map(in: (Int, Long, String)): (Int, Long, String) = {
+          in
+        }
+      }).withParameters(conf)
+    bcMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 5d80830..35c0e93 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -20,199 +20,183 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunction}
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
 
-object PartitionProgs {
-  var NUM_PROGRAMS: Int = 7
-
-  def runProgram(progId: Int, resultPath: String, onCollection: Boolean): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Test hash partition by tuple field
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
+  @Rule
+  def tempFolder = _tempFolder
 
-        val unique = ds.partitionByHash(1).mapPartition( _.map(_._2).toSet )
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
 
-        unique.writeAsText(resultPath)
-        env.execute()
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
 
-        "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+  @Test
+  def testHashPartitionByTupleField: Unit = {
+    /*
+     * Test hash partition by tuple field
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
 
-      case 2 =>
-        /*
-         * Test hash partition by key selector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val unique = ds.partitionByHash( _._2 ).mapPartition( _.map(_._2).toSet )
+    val unique = ds.partitionByHash(1).mapPartition( _.map(_._2).toSet )
 
-        unique.writeAsText(resultPath)
-        env.execute()
-        "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+    unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
 
-      case 3 =>
-        /*
-         * Test forced rebalancing
-         */
-      val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = env.generateSequence(1, 3000)
+    expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+  }
 
-        val skewed = ds.filter(_ > 780)
-        val rebalanced = skewed.rebalance()
+  @Test
+  def testHashPartitionByKeySelector: Unit = {
+    /*
+     * Test hash partition by key selector
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val unique = ds.partitionByHash( _._2 ).mapPartition( _.map(_._2).toSet )
+
+    unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+  }
 
-        val countsInPartition = rebalanced.map( new RichMapFunction[Long, (Int, Long)] {
-          def map(in: Long) = {
-            (getRuntimeContext.getIndexOfThisSubtask, 1)
-          }
-        })
-          .groupBy(0)
-          .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
-          // round counts to mitigate runtime scheduling effects (lazy split assignment)
-          .map { in => (in._1, in._2 / 10) }
-
-        countsInPartition.writeAsText(resultPath)
-        env.execute()
-
-        val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10
-        var result = ""
-        for (i <- 0 until env.getDegreeOfParallelism) {
-          result += "(" + i + "," + numPerPartition + ")\n"
-        }
-        result
-
-      case 4 =>
-        // Verify that mapPartition operation after repartition picks up correct
-        // DOP
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        env.setDegreeOfParallelism(1)
-
-        val unique = ds.partitionByHash(1)
-          .setParallelism(4)
-          .mapPartition( _.map(_._2).toSet )
-
-        unique.writeAsText(resultPath)
-        env.execute()
-
-        "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-
-      case 5 =>
-        // Verify that map operation after repartition picks up correct
-        // DOP
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        env.setDegreeOfParallelism(1)
-
-        val count = ds.partitionByHash(0).setParallelism(4).map(
-          new RichMapFunction[(Int, Long, String), Tuple1[Int]] {
-            var first = true
-            override def map(in: (Int, Long, String)): Tuple1[Int] = {
-              // only output one value with count 1
-              if (first) {
-                first = false
-                Tuple1(1)
-              } else {
-                Tuple1(0)
-              }
-            }
-          }).sum(0)
-
-        count.writeAsText(resultPath)
-        env.execute()
-
-        if (onCollection) "(1)\n" else "(4)\n"
-
-      case 6 =>
-        // Verify that filter operation after repartition picks up correct
-        // DOP
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        env.setDegreeOfParallelism(1)
-
-        val count = ds.partitionByHash(0).setParallelism(4).filter(
-          new RichFilterFunction[(Int, Long, String)] {
-            var first = true
-            override def filter(in: (Int, Long, String)): Boolean = {
-              // only output one value with count 1
-              if (first) {
-                first = false
-                true
-              } else {
-                false
-              }
-            }
-        })
-          .map( _ => Tuple1(1)).sum(0)
-
-        count.writeAsText(resultPath)
-        env.execute()
-
-        if (onCollection) "(1)\n" else "(4)\n"
-
-      case 7 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        env.setDegreeOfParallelism(3)
-        val ds = CollectionDataSets.getDuplicatePojoDataSet(env)
-        val uniqLongs = ds
-          .partitionByHash("nestedPojo.longNumber")
-          .setParallelism(4)
-          .mapPartition( _.map(_.nestedPojo.longNumber).toSet )
-
-        uniqLongs.writeAsText(resultPath)
-        env.execute()
-        "10000\n" + "20000\n" + "30000\n"
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
+  @Test
+  def testForcedRebalancing: Unit = {
+    /*
+     * Test forced rebalancing
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.generateSequence(1, 3000)
+
+    val skewed = ds.filter(_ > 780)
+    val rebalanced = skewed.rebalance()
+
+    val countsInPartition = rebalanced.map( new RichMapFunction[Long, (Int, Long)] {
+      def map(in: Long) = {
+        (getRuntimeContext.getIndexOfThisSubtask, 1)
+      }
+    })
+      .groupBy(0)
+      .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
+      // round counts to mitigate runtime scheduling effects (lazy split assignment)
+      .map { in => (in._1, in._2 / 10) }
+
+    countsInPartition.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+
+    val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10
+    expected = ""
+    for (i <- 0 until env.getDegreeOfParallelism) {
+      expected += "(" + i + "," + numPerPartition + ")\n"
     }
   }
-}
 
+  @Test
+  def testMapPartitionAfterRepartitionHasCorrectDOP: Unit = {
+    // Verify that mapPartition operation after repartition picks up correct
+    // DOP
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    env.setDegreeOfParallelism(1)
 
-@RunWith(classOf[Parameterized])
-class PartitionITCase(config: Configuration) extends JavaProgramTestBase(config) {
+    val unique = ds.partitionByHash(1)
+      .setParallelism(4)
+      .mapPartition( _.map(_._2).toSet )
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+    unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+    expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = PartitionProgs.runProgram(curProgId, resultPath, isCollectionExecution)
-  }
+  @Test
+  def testMapAfterRepartitionHasCorrectDOP: Unit = {
+    // Verify that map operation after repartition picks up correct
+    // DOP
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    env.setDegreeOfParallelism(1)
+
+    val count = ds.partitionByHash(0).setParallelism(4).map(
+      new RichMapFunction[(Int, Long, String), Tuple1[Int]] {
+        var first = true
+        override def map(in: (Int, Long, String)): Tuple1[Int] = {
+          // only output one value with count 1
+          if (first) {
+            first = false
+            Tuple1(1)
+          } else {
+            Tuple1(0)
+          }
+        }
+      }).sum(0)
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+    count.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+
+    expected = if (mode == ExecutionMode.COLLECTION) "(1)\n" else "(4)\n"
   }
-}
 
-object PartitionITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to PartitionProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
+  @Test
+  def testFilterAfterRepartitionHasCorrectDOP: Unit = {
+    // Verify that filter operation after repartition picks up correct
+    // DOP
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    env.setDegreeOfParallelism(1)
+
+    val count = ds.partitionByHash(0).setParallelism(4).filter(
+      new RichFilterFunction[(Int, Long, String)] {
+        var first = true
+        override def filter(in: (Int, Long, String)): Boolean = {
+          // only output one value with count 1
+          if (first) {
+            first = false
+            true
+          } else {
+            false
+          }
+        }
+      })
+      .map( _ => Tuple1(1)).sum(0)
+
+    count.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
 
-    configs.asJavaCollection
+    expected = if (mode == ExecutionMode.COLLECTION) "(1)\n" else "(4)\n"
   }
-}
 
+  @Test
+  def testPartitionNestedPojo: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    env.setDegreeOfParallelism(3)
+    val ds = CollectionDataSets.getDuplicatePojoDataSet(env)
+    val uniqLongs = ds
+      .partitionByHash("nestedPojo.longNumber")
+      .setParallelism(4)
+      .mapPartition( _.map(_.nestedPojo.longNumber).toSet )
+
+    uniqLongs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "10000\n" + "20000\n" + "30000\n"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index 6f8af7e..5f63dc4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -21,216 +21,212 @@ import org.apache.flink.api.common.functions.RichReduceFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
+@RunWith(classOf[Parameterized])
+class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
 
-object ReduceProgs {
-  var NUM_PROGRAMS: Int = 10
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Reduce on tuples with key field selector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val reduceDs = ds.groupBy(1)
-          .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
-
-      case 2 =>
-        /*
-         * Reduce on tuples with multiple key field selectors
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val reduceDs = ds.groupBy(4, 0)
-          .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
-          "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
-          "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
-      case 3 =>
-        /*
-         * Reduce on tuples with key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val reduceDs = ds.groupBy(_._2)
-          .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
-
-      case 4 =>
-        /*
-         * Reduce on custom type with key extractor
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val reduceDs = ds.groupBy(_.myInt)
-          .reduce { (in1, in2) =>
-          in1.myLong += in2.myLong
-          in1.myString = "Hello!"
-          in1
-        }
-        reduceDs.writeAsText(resultPath)
-        env.execute()
-        "1,0,Hi\n" + "2,3,Hello!\n" + "3,12,Hello!\n" + "4,30,Hello!\n" + "5,60," +
-          "Hello!\n" + "6,105,Hello!\n"
-
-      case 5 =>
-        /*
-         * All-reduce for tuple
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val reduceDs =
-          ds.reduce { (in1, in2) => (in1._1 + in2._1, in1._2 + in2._2, "Hello World") }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "231,91,Hello World\n"
-
-      case 6 =>
-        /*
-         * All-reduce for custom types
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.getCustomTypeDataSet(env)
-        val reduceDs = ds
-          .reduce { (in1, in2) =>
-          in1.myInt += in2.myInt
-          in1.myLong += in2.myLong
-          in1.myString = "Hello!"
-          in1
-        }
-        reduceDs.writeAsText(resultPath)
-        env.execute()
-        "91,210,Hello!"
-
-      case 7 =>
-        /*
-         * Reduce with broadcast set
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val intDs = CollectionDataSets.getIntDataSet(env)
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val reduceDs = ds.groupBy(1).reduce(
-          new RichReduceFunction[(Int, Long, String)] {
-            private var f2Replace = ""
-
-            override def open(config: Configuration) {
-              val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
-              f2Replace = ints.sum + ""
-            }
-
-            override def reduce(
-                in1: (Int, Long, String),
-                in2: (Int, Long, String)): (Int, Long, String) = {
-              (in1._1 + in2._1, in1._2, f2Replace)
-            }
-          }).withBroadcastSet(intDs, "ints")
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "5,2,55\n" + "15,3,55\n" + "34,4,55\n" + "65,5,55\n" + "111,6,55\n"
-
-      case 8 =>
-        /*
-         * Reduce with UDF that returns the second input object (check mutable object handling)
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env).map (t => MutableTuple3(t._1, t._2, t._3))
-        val reduceDs = ds.groupBy(1).reduce(
-          new RichReduceFunction[MutableTuple3[Int, Long, String]] {
-            override def reduce(
-                in1: MutableTuple3[Int, Long, String],
-                in2: MutableTuple3[Int, Long, String]): MutableTuple3[Int, Long, String] = {
-              in2._1 = in1._1 + in2._1
-              in2._3 = "Hi again!"
-              in2
-            }
-          })
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,Hi\n" + "5,2,Hi again!\n" + "15,3,Hi again!\n" + "34,4,Hi again!\n" + "65,5," +
-          "Hi again!\n" + "111,6,Hi again!\n"
-
-      case 9 =>
-        /*
-         * Reduce with a Tuple-returning KeySelector
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val reduceDs = ds.groupBy(t => (t._1, t._5))
-          .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
-          "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
-          "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
-      case 10 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get5TupleDataSet(env)
-        val reduceDs = ds.groupBy("_5", "_1")
-          .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
-        reduceDs.writeAsCsv(resultPath)
-        env.execute()
-        "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
-          "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
-          "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
-      case id =>
-        throw new IllegalArgumentException(s"Invalid program id $id")
-    }
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
   }
-}
 
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
 
-@RunWith(classOf[Parameterized])
-class ReduceITCase(config: Configuration) extends JavaProgramTestBase(config) {
+  @Test
+  def testReduceOnTuplesWithKeyFieldSelector: Unit = {
+    /*
+     * Reduce on tuples with key field selector
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val reduceDs = ds.groupBy(1)
+      .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
+  }
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+  @Test
+  def testReduceOnTuplesWithMultipleKeyFieldSelectors: Unit = {
+    /*
+     * Reduce on tuples with multiple key field selectors
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+    val reduceDs = ds.groupBy(4, 0)
+      .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
+      "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
+      "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
+  }
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+  @Test
+  def testReduceOnTuplesWithKeyExtractor: Unit = {
+    /*
+     * Reduce on tuples with key extractor
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val reduceDs = ds.groupBy(_._2)
+      .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = ReduceProgs.runProgram(curProgId, resultPath)
+  @Test
+  def testReduceOnCustomTypeWithKeyExtractor: Unit = {
+    /*
+     * Reduce on custom type with key extractor
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val reduceDs = ds.groupBy(_.myInt)
+      .reduce { (in1, in2) =>
+      in1.myLong += in2.myLong
+      in1.myString = "Hello!"
+      in1
+    }
+    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,0,Hi\n" + "2,3,Hello!\n" + "3,12,Hello!\n" + "4,30,Hello!\n" + "5,60," +
+      "Hello!\n" + "6,105,Hello!\n"
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testAllReduceForTuple: Unit = {
+    /*
+     * All-reduce for tuple
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val reduceDs =
+      ds.reduce { (in1, in2) => (in1._1 + in2._1, in1._2 + in2._2, "Hello World") }
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "231,91,Hello World\n"
   }
-}
 
-object ReduceITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to ReduceProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
+  @Test
+  def testAllReduceForCustomTypes: Unit = {
+    /*
+     * All-reduce for custom types
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val reduceDs = ds
+      .reduce { (in1, in2) =>
+      in1.myInt += in2.myInt
+      in1.myLong += in2.myLong
+      in1.myString = "Hello!"
+      in1
     }
+    reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "91,210,Hello!"
+  }
+
+  @Test
+  def testReduceWithBroadcastSet: Unit = {
+    /*
+     * Reduce with broadcast set
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val intDs = CollectionDataSets.getIntDataSet(env)
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val reduceDs = ds.groupBy(1).reduce(
+      new RichReduceFunction[(Int, Long, String)] {
+        private var f2Replace = ""
+
+        override def open(config: Configuration) {
+          val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+          f2Replace = ints.sum + ""
+        }
 
-    configs.asJavaCollection
+        override def reduce(
+                             in1: (Int, Long, String),
+                             in2: (Int, Long, String)): (Int, Long, String) = {
+          (in1._1 + in2._1, in1._2, f2Replace)
+        }
+      }).withBroadcastSet(intDs, "ints")
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "5,2,55\n" + "15,3,55\n" + "34,4,55\n" + "65,5,55\n" + "111,6,55\n"
   }
-}
 
+  @Test
+  def testReduceWithUDFThatReturnsTheSecondInputObject: Unit = {
+    /*
+     * Reduce with UDF that returns the second input object (check mutable object handling)
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).map (t => MutableTuple3(t._1, t._2, t._3))
+    val reduceDs = ds.groupBy(1).reduce(
+      new RichReduceFunction[MutableTuple3[Int, Long, String]] {
+        override def reduce(
+                             in1: MutableTuple3[Int, Long, String],
+                             in2: MutableTuple3[Int, Long, String]): MutableTuple3[Int, Long,
+          String] = {
+          in2._1 = in1._1 + in2._1
+          in2._3 = "Hi again!"
+          in2
+        }
+      })
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "5,2,Hi again!\n" + "15,3,Hi again!\n" + "34,4,Hi again!\n" + "65,5," +
+      "Hi again!\n" + "111,6,Hi again!\n"
+  }
+
+  @Test
+  def testReduceWithATupleReturningKeySelector: Unit = {
+    /*
+     * Reduce with a Tuple-returning KeySelector
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+    val reduceDs = ds.groupBy(t => (t._1, t._5))
+      .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
+      "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
+      "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
+  }
+
+  @Test
+  def testReduceOnGroupedDSByExpressionKey: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get5TupleDataSet(env)
+    val reduceDs = ds.groupBy("_5", "_1")
+      .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
+    reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
+      "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
+      "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index 128135e..5e456e0 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -18,126 +18,96 @@
 
 package org.apache.flink.api.scala.operators
 
-import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
-/**
- * These tests are copied from [[AggregateITCase]] replacing calls to aggregate with calls to sum,
- * min, and max
- */
-object SumMinMaxProgs {
-  var NUM_PROGRAMS: Int = 3
-
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        // Full aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-
-        val aggregateDs = ds
-          .sum(0)
-          .andMax(1)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map{ t => (t._1, t._2) }
-
-        aggregateDs.writeAsCsv(resultPath)
 
-        env.execute()
+@RunWith(classOf[Parameterized])
+class SumMinMaxITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
 
-        // return expected result
-        "231,6\n"
+  @Rule
+  def tempFolder = _tempFolder
 
-      case 2 =>
-        // Grouped aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
 
-        val aggregateDs = ds
-          .groupBy(1)
-          .sum(0)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map { t => (t._2, t._1) }
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
 
-        aggregateDs.writeAsCsv(resultPath)
+  @Test
+  def testFullAggregate: Unit = {
+    // Full aggregate
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
 
-        env.execute()
+    val aggregateDs = ds
+      .sum(0)
+      .andMax(1)
+      // Ensure aggregate operator correctly copies other fields
+      .filter(_._3 != null)
+      .map{ t => (t._1, t._2) }
 
-        // return expected result
-        "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
 
-      case 3 =>
-        // Nested aggregate
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
+    env.execute()
 
-        val aggregateDs = ds
-          .groupBy(1)
-          .min(0)
-          .min(0)
-          // Ensure aggregate operator correctly copies other fields
-          .filter(_._3 != null)
-          .map { t => new Tuple1(t._1) }
+    expected = "231,6\n"
+  }
 
-        aggregateDs.writeAsCsv(resultPath)
+  @Test
+  def testGroupedAggregate: Unit = {
+    // Grouped aggregate
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
 
-        env.execute()
+    val aggregateDs = ds
+      .groupBy(1)
+      .sum(0)
+      // Ensure aggregate operator correctly copies other fields
+      .filter(_._3 != null)
+      .map { t => (t._2, t._1) }
 
-        // return expected result
-        "1\n"
+    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
 
+    env.execute()
 
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+    expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
   }
-}
-
 
-@RunWith(classOf[Parameterized])
-class SumMinMaxITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+  @Test
+  def testNestedAggregate: Unit = {
+    // Nested aggregate
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
+    val aggregateDs = ds
+      .groupBy(1)
+      .min(0)
+      .min(0)
+      // Ensure aggregate operator correctly copies other fields
+      .filter(_._3 != null)
+      .map { t => new Tuple1(t._1) }
 
-  protected def testProgram(): Unit = {
-    expectedResult = SumMinMaxProgs.runProgram(curProgId, resultPath)
-  }
+    aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
+    env.execute()
 
-object SumMinMaxITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to SumMinMaxProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
-
-    configs.asJavaCollection
+    expected = "1\n"
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index 5304ffa..3eed128 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -18,22 +18,21 @@
 package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.Assert
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit._
+import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.flink.api.scala._
 
-
-object UnionProgs {
-  var NUM_PROGRAMS: Int = 3
+@RunWith(classOf[Parameterized])
+class UnionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
 
   private final val FULL_TUPLE_3_STRING: String = "1,1,Hi\n" + "2,2,Hello\n" + "3,2," +
     "Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3," +
@@ -42,85 +41,61 @@ object UnionProgs {
     "Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6," +
     "Comment#12\n" + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
 
-  def runProgram(progId: Int, resultPath: String): String = {
-    progId match {
-      case 1 =>
-        /*
-         * Union of 2 Same Data Sets
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
-        unionDs.writeAsCsv(resultPath)
-        env.execute()
-        FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
+  @Rule
+  def tempFolder = _tempFolder
 
-      case 2 =>
-        /*
-         * Union of 5 same Data Sets, with multiple unions
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val ds = CollectionDataSets.get3TupleDataSet(env)
-        val unionDs = ds
-          .union(CollectionDataSets.get3TupleDataSet(env))
-          .union(CollectionDataSets.get3TupleDataSet(env))
-          .union(CollectionDataSets.get3TupleDataSet(env))
-          .union(CollectionDataSets.get3TupleDataSet(env))
-        unionDs.writeAsCsv(resultPath)
-        env.execute()
-        FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
-          FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
-
-      case 3 =>
-        /*
-         * Test on union with empty dataset
-         */
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        // Don't know how to make an empty result in an other way than filtering it
-        val empty = CollectionDataSets.get3TupleDataSet(env).filter( t => false )
-        val unionDs = CollectionDataSets.get3TupleDataSet(env).union(empty)
-        unionDs.writeAsCsv(resultPath)
-        env.execute()
-        FULL_TUPLE_3_STRING
-
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
   }
-}
-
 
-@RunWith(classOf[Parameterized])
-class UnionITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
-
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
   }
 
-  protected def testProgram(): Unit = {
-    expectedResult = UnionProgs.runProgram(curProgId, resultPath)
+  @Test
+  def testUnionOf2IdenticalDS: Unit = {
+    /*
+     * Union of 2 Same Data Sets
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
+    unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
+  @Test
+  def testUnionOf5IdenticalDSWithMultipleUnions: Unit = {
+    /*
+     * Union of 5 same Data Sets, with multiple unions
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val unionDs = ds
+      .union(CollectionDataSets.get3TupleDataSet(env))
+      .union(CollectionDataSets.get3TupleDataSet(env))
+      .union(CollectionDataSets.get3TupleDataSet(env))
+      .union(CollectionDataSets.get3TupleDataSet(env))
+    unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
+      FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
   }
-}
-
-object UnionITCase {
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to UnionProgs.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
 
-    configs.asJavaCollection
+  @Test
+  def testUnionWithEmptyDS: Unit = {
+    /*
+     * Test on union with empty dataset
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    // Don't know how to make an empty result in an other way than filtering it
+    val empty = CollectionDataSets.get3TupleDataSet(env).filter( t => false )
+    val unionDs = CollectionDataSets.get3TupleDataSet(env).union(empty)
+    unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = FULL_TUPLE_3_STRING
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 4024304..104a440 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -17,161 +17,152 @@
  */
 package org.apache.flink.api.scala.runtime
 
-import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit._
+import org.junit.rules.TemporaryFolder
 
 import org.apache.flink.api.scala._
-
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 
 @RunWith(classOf[Parameterized])
-class ScalaSpecialTypesITCase(config: Configuration) extends JavaProgramTestBase(config) {
+class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
 
-  private var curProgId: Int = config.getInteger("ProgramId", -1)
-  private var resultPath: String = null
-  private var expectedResult: String = null
+  val _tempFolder = new TemporaryFolder()
 
-  protected override def preSubmit(): Unit = {
-    resultPath = getTempDirPath("result")
-  }
+  @Rule def tempFolder = _tempFolder
 
-  protected def testProgram(): Unit = {
-    expectedResult = curProgId match {
-      case 1 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val nums = env.fromElements(1, 2, 1, 2)
+  @Test
+  def testEither1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val nums = env.fromElements(1, 2, 1, 2)
 
-        val eithers = nums.map(_ match {
-          case 1 => Left(10)
-          case 2 => Right(20)
-        })
+    val eithers = nums.map(_ match {
+      case 1 => Left(10)
+      case 2 => Right(20)
+    })
 
-        val result = eithers.map(_ match {
-          case Left(i) => i
-          case Right(i) => i
-        }).reduce(_ + _).writeAsText(resultPath)
+    val resultPath = tempFolder.newFile().toPath.toUri.toString
 
-        env.execute()
+    val result = eithers.map{
+      _ match {
+      case Left(i) => i
+      case Right(i) => i
+    }}.reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
-        "60"
+    env.execute()
 
-      case 2 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val nums = env.fromElements(1, 2, 1, 2)
+    compareResultsByLinesInMemory("60", resultPath)
+  }
 
-        val eithers = nums.map(_ match {
-          case 1 => Left(10)
-          case 2 => Left(20)
-        })
+  @Test
+  def testEither2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val nums = env.fromElements(1, 2, 1, 2)
 
-        val result = eithers.map(_ match {
-          case Left(i) => i
-        }).reduce(_ + _).writeAsText(resultPath)
+    val eithers = nums.map(_ match {
+      case 1 => Left(10)
+      case 2 => Left(20)
+    })
 
-        env.execute()
+    val resultPath = tempFolder.newFile().toPath.toUri.toString
 
-        "60"
+    val result = eithers.map(_ match {
+      case Left(i) => i
+    }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
-      case 3 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val nums = env.fromElements(1, 2, 1, 2)
+    env.execute()
 
-        val eithers = nums.map(_ match {
-          case 1 => Right(10)
-          case 2 => Right(20)
-        })
+    compareResultsByLinesInMemory("60", resultPath)
+  }
 
-        val result = eithers.map(_ match {
-          case Right(i) => i
-        }).reduce(_ + _).writeAsText(resultPath)
+  @Test
+  def testEither3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val nums = env.fromElements(1, 2, 1, 2)
 
-        env.execute()
+    val eithers = nums.map(_ match {
+      case 1 => Right(10)
+      case 2 => Right(20)
+    })
 
-        "60"
+    val resultPath = tempFolder.newFile().toPath.toUri.toString
 
-      case 4 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val nums = env.fromElements(1, 2, 1, 2)
+    val result = eithers.map(_ match {
+      case Right(i) => i
+    }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
-        val eithers = nums.map(_ match {
-          case 1 => Some(10)
-          case 2 => None
-        })
+    env.execute()
 
-        val result = eithers.map(_ match {
-          case Some(i) => i
-          case None => 20
-        }).reduce(_ + _).writeAsText(resultPath)
+    compareResultsByLinesInMemory("60", resultPath)
+  }
 
-        env.execute()
+  @Test
+  def testOption1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val nums = env.fromElements(1, 2, 1, 2)
 
-        "60"
+    val eithers = nums.map(_ match {
+      case 1 => Some(10)
+      case 2 => None
+    })
 
-      case 5 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val nums = env.fromElements(1, 2, 1, 2)
+    val resultPath = tempFolder.newFile().toPath.toUri.toString
 
-        val eithers = nums.map(_ match {
-          case 1 => Some(10)
-          case 2 => Some(20)
-        })
 
-        val result = eithers.map(_ match {
-          case Some(i) => i
-        }).reduce(_ + _).writeAsText(resultPath)
+    val result = eithers.map(_ match {
+      case Some(i) => i
+      case None => 20
+    }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
-        env.execute()
+    env.execute()
 
-        "60"
+    compareResultsByLinesInMemory("60", resultPath)
+  }
 
-      case 6 =>
-        val env = ExecutionEnvironment.getExecutionEnvironment
-        val nums = env.fromElements(1, 2, 1, 2)
+  @Test
+  def testOption2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val nums = env.fromElements(1, 2, 1, 2)
 
-        val eithers = nums.map(_ match {
-          case 1 => None
-          case 2 => None
-        })
+    val eithers = nums.map(_ match {
+      case 1 => Some(10)
+      case 2 => Some(20)
+    })
 
-        val result = eithers.map(_ match {
-          case None => 20
-        }).reduce(_ + _).writeAsText(resultPath)
+    val resultPath = tempFolder.newFile().toPath.toUri.toString
 
-        env.execute()
+    val result = eithers.map(_ match {
+      case Some(i) => i
+    }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
-        "80"
+    env.execute()
 
-      case _ =>
-        throw new IllegalArgumentException("Invalid program id")
-    }
+    compareResultsByLinesInMemory("60", resultPath)
   }
 
-  protected override def postSubmit(): Unit = {
-    compareResultsByLinesInMemory(expectedResult, resultPath)
-  }
-}
+  @Test
+  def testOption3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val nums = env.fromElements(1, 2, 1, 2)
+
+    val eithers = nums.map(_ match {
+      case 1 => None
+      case 2 => None
+    })
+
+    val resultPath = tempFolder.newFile().toPath.toUri.toString
 
-object ScalaSpecialTypesITCase {
-  var NUM_PROGRAMS: Int = 6
+    val result = eithers.map(_ match {
+      case None => 20
+    }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
 
-  @Parameters
-  def getConfigurations: java.util.Collection[Array[AnyRef]] = {
-    val configs = mutable.MutableList[Array[AnyRef]]()
-    for (i <- 1 to ScalaSpecialTypesITCase.NUM_PROGRAMS) {
-      val config = new Configuration()
-      config.setInteger("ProgramId", i)
-      configs += Array(config)
-    }
+    env.execute()
 
-    configs.asJavaCollection
+    compareResultsByLinesInMemory("80", resultPath)
   }
 }