You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:04 UTC
[68/82] [abbrv] incubator-flink git commit: Change integration tests
to reuse cluster in order to save startup and shutdown time.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 64f04cb..78426ef 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -21,222 +21,212 @@ import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.apache.flink.util.Collector
-import org.junit.Assert
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit._
+import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.flink.api.scala._
+@RunWith(classOf[Parameterized])
+class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = null
+ private val _tempFolder = new TemporaryFolder()
-object MapProgs {
- var NUM_PROGRAMS: Int = 9
-
- def runProgram(progId: Int, resultPath: String): String = {
- progId match {
- case 1 =>
- /*
- * Test identity map with basic type
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getStringDataSet(env)
- val identityMapDs = ds.map( t => t)
- identityMapDs.writeAsText(resultPath)
- env.execute()
- "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
- ".\n" + "Luke Skywalker\n" + "Random comment\n" + "LOL\n"
-
- case 2 =>
- /*
- * Test identity map with a tuple
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val identityMapDs = ds.map( t => t )
- identityMapDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
- "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
- "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
- "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
- "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
- case 3 =>
- /*
- * Test type conversion mapper (Custom -> Tuple)
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val typeConversionMapDs = ds.map( c => (c.myInt, c.myLong, c.myString) )
- typeConversionMapDs.writeAsCsv(resultPath)
- env.execute()
- "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, " +
- "how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + "4,6," +
- "Comment#1\n" + "4,7,Comment#2\n" + "4,8,Comment#3\n" + "4,9,Comment#4\n" + "5,10," +
- "Comment#5\n" + "5,11,Comment#6\n" + "5,12,Comment#7\n" + "5,13,Comment#8\n" + "5,14," +
- "Comment#9\n" + "6,15,Comment#10\n" + "6,16,Comment#11\n" + "6,17,Comment#12\n" + "6," +
- "18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"
-
- case 4 =>
- /*
- * Test type conversion mapper (Tuple -> Basic)
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val typeConversionMapDs = ds.map(_._3)
- typeConversionMapDs.writeAsText(resultPath)
- env.execute()
- "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
- ".\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + "Comment#3\n" +
- "Comment#4\n" + "Comment#5\n" + "Comment#6\n" + "Comment#7\n" + "Comment#8\n" +
- "Comment#9\n" + "Comment#10\n" + "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
- "Comment#14\n" + "Comment#15\n"
-
- case 5 =>
- /*
- * Test mapper on tuple - Increment Integer field, reorder second and third fields
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val tupleMapDs = ds.map( t => (t._1 + 1, t._3, t._2) )
- tupleMapDs.writeAsCsv(resultPath)
- env.execute()
- "2,Hi,1\n" + "3,Hello,2\n" + "4,Hello world,2\n" + "5,Hello world, how are you?," +
- "3\n" + "6,I am fine.,3\n" + "7,Luke Skywalker,3\n" + "8,Comment#1,4\n" + "9,Comment#2," +
- "4\n" + "10,Comment#3,4\n" + "11,Comment#4,4\n" + "12,Comment#5,5\n" + "13,Comment#6," +
- "5\n" + "14,Comment#7,5\n" + "15,Comment#8,5\n" + "16,Comment#9,5\n" + "17,Comment#10," +
- "6\n" + "18,Comment#11,6\n" + "19,Comment#12,6\n" + "20,Comment#13,6\n" + "21," +
- "Comment#14,6\n" + "22,Comment#15,6\n"
-
- case 6 =>
- /*
- * Test mapper on Custom - lowercase myString
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val customMapDs = ds.map { c => c.myString = c.myString.toLowerCase; c }
- customMapDs.writeAsText(resultPath)
- env.execute()
- "1,0,hi\n" + "2,1,hello\n" + "2,2,hello world\n" + "3,3,hello world, " +
- "how are you?\n" + "3,4,i am fine.\n" + "3,5,luke skywalker\n" + "4,6," +
- "comment#1\n" + "4,7,comment#2\n" + "4,8,comment#3\n" + "4,9,comment#4\n" + "5,10," +
- "comment#5\n" + "5,11,comment#6\n" + "5,12,comment#7\n" + "5,13,comment#8\n" + "5,14," +
- "comment#9\n" + "6,15,comment#10\n" + "6,16,comment#11\n" + "6,17,comment#12\n" + "6," +
- "18,comment#13\n" + "6,19,comment#14\n" + "6,20,comment#15\n"
-
- case 7 =>
- /*
- * Test mapper if UDF returns input object - increment first field of a tuple
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).map {
- t => MutableTuple3(t._1, t._2, t._3)
- }
- val inputObjMapDs = ds.map { t => t._1 = t._1 + 1; t }
- inputObjMapDs.writeAsCsv(resultPath)
- env.execute()
- "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n" + "5,3,Hello world, " +
- "how are you?\n" + "6,3,I am fine.\n" + "7,3,Luke Skywalker\n" + "8,4," +
- "Comment#1\n" + "9,4,Comment#2\n" + "10,4,Comment#3\n" + "11,4,Comment#4\n" + "12,5," +
- "Comment#5\n" + "13,5,Comment#6\n" + "14,5,Comment#7\n" + "15,5,Comment#8\n" + "16,5," +
- "Comment#9\n" + "17,6,Comment#10\n" + "18,6,Comment#11\n" + "19,6,Comment#12\n" + "20," +
- "6,Comment#13\n" + "21,6,Comment#14\n" + "22,6,Comment#15\n"
-
- case 8 =>
- /*
- * Test map with broadcast set
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ints = CollectionDataSets.getIntDataSet(env)
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val bcMapDs = ds.map(
- new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
- var f2Replace = 0
- override def open(config: Configuration): Unit = {
- val ints = getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
- f2Replace = ints.sum
- }
- override def map(in: (Int, Long, String)): (Int, Long, String) = {
- in.copy(_1 = f2Replace)
- }
- }).withBroadcastSet(ints, "ints")
- bcMapDs.writeAsCsv(resultPath)
- env.execute()
- "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, " +
- "how are you?\n" + "55,3,I am fine.\n" + "55,3,Luke Skywalker\n" + "55,4," +
- "Comment#1\n" + "55,4,Comment#2\n" + "55,4,Comment#3\n" + "55,4,Comment#4\n" + "55,5," +
- "Comment#5\n" + "55,5,Comment#6\n" + "55,5,Comment#7\n" + "55,5,Comment#8\n" + "55,5," +
- "Comment#9\n" + "55,6,Comment#10\n" + "55,6,Comment#11\n" + "55,6,Comment#12\n" + "55," +
- "6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"
-
- case 9 =>
- /*
- * Test passing configuration object.
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getSmall3TupleDataSet(env)
- val conf = new Configuration
- val testKey = "testVariable"
- val testValue = 666
- conf.setInteger(testKey, testValue)
- val bcMapDs = ds.map(
- new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
- override def open(config: Configuration): Unit = {
- val fromConfig = config.getInteger(testKey, -1)
- Assert.assertEquals(testValue, fromConfig)
- }
- override def map(in: (Int, Long, String)): (Int, Long, String) = {
- in
- }
- }).withParameters(conf)
- bcMapDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world"
-
- case _ =>
- throw new IllegalArgumentException("Invalid program id")
- }
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
}
-}
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
-@RunWith(classOf[Parameterized])
-class MapITCase(config: Configuration) extends JavaProgramTestBase(config) {
+ @Test
+ def testIdentityMapperWithBasicType: Unit = {
+ /*
+ * Test identity map with basic type
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getStringDataSet(env)
+ val identityMapDs = ds.map( t => t)
+ identityMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
+ ".\n" + "Luke Skywalker\n" + "Random comment\n" + "LOL\n"
+ }
- private var curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
+ @Test
+ def testIdentityMapperWithTuple: Unit = {
+ /*
+ * Test identity map with a tuple
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val identityMapDs = ds.map( t => t )
+ identityMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ }
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
+ @Test
+ def testTypeConversionMapperCustomToTuple: Unit = {
+ /*
+ * Test type conversion mapper (Custom -> Tuple)
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getCustomTypeDataSet(env)
+ val typeConversionMapDs = ds.map( c => (c.myInt, c.myLong, c.myString) )
+ typeConversionMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,0,Hi\n" + "2,1,Hello\n" + "2,2,Hello world\n" + "3,3,Hello world, " +
+ "how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + "4,6," +
+ "Comment#1\n" + "4,7,Comment#2\n" + "4,8,Comment#3\n" + "4,9,Comment#4\n" + "5,10," +
+ "Comment#5\n" + "5,11,Comment#6\n" + "5,12,Comment#7\n" + "5,13,Comment#8\n" + "5,14," +
+ "Comment#9\n" + "6,15,Comment#10\n" + "6,16,Comment#11\n" + "6,17,Comment#12\n" + "6," +
+ "18,Comment#13\n" + "6,19,Comment#14\n" + "6,20,Comment#15\n"
}
- protected def testProgram(): Unit = {
- expectedResult = MapProgs.runProgram(curProgId, resultPath)
+ @Test
+ def testTypeConversionMapperTupleToBasic: Unit = {
+ /*
+ * Test type conversion mapper (Tuple -> Basic)
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val typeConversionMapDs = ds.map(_._3)
+ typeConversionMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + "I am fine" +
+ ".\n" + "Luke Skywalker\n" + "Comment#1\n" + "Comment#2\n" + "Comment#3\n" +
+ "Comment#4\n" + "Comment#5\n" + "Comment#6\n" + "Comment#7\n" + "Comment#8\n" +
+ "Comment#9\n" + "Comment#10\n" + "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
+ "Comment#14\n" + "Comment#15\n"
}
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
+ @Test
+ def testMapperOnTupleIncrementFieldReorderSecondAndThirdFields: Unit = {
+ /*
+ * Test mapper on tuple - Increment Integer field, reorder second and third fields
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val tupleMapDs = ds.map( t => (t._1 + 1, t._3, t._2) )
+ tupleMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "2,Hi,1\n" + "3,Hello,2\n" + "4,Hello world,2\n" + "5,Hello world, how are you?," +
+ "3\n" + "6,I am fine.,3\n" + "7,Luke Skywalker,3\n" + "8,Comment#1,4\n" + "9,Comment#2," +
+ "4\n" + "10,Comment#3,4\n" + "11,Comment#4,4\n" + "12,Comment#5,5\n" + "13,Comment#6," +
+ "5\n" + "14,Comment#7,5\n" + "15,Comment#8,5\n" + "16,Comment#9,5\n" + "17,Comment#10," +
+ "6\n" + "18,Comment#11,6\n" + "19,Comment#12,6\n" + "20,Comment#13,6\n" + "21," +
+ "Comment#14,6\n" + "22,Comment#15,6\n"
+ }
+
+ @Test
+ def testMapperOnCustomLowercaseString: Unit = {
+ /*
+ * Test mapper on Custom - lowercase myString
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getCustomTypeDataSet(env)
+ val customMapDs = ds.map { c => c.myString = c.myString.toLowerCase; c }
+ customMapDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,0,hi\n" + "2,1,hello\n" + "2,2,hello world\n" + "3,3,hello world, " +
+ "how are you?\n" + "3,4,i am fine.\n" + "3,5,luke skywalker\n" + "4,6," +
+ "comment#1\n" + "4,7,comment#2\n" + "4,8,comment#3\n" + "4,9,comment#4\n" + "5,10," +
+ "comment#5\n" + "5,11,comment#6\n" + "5,12,comment#7\n" + "5,13,comment#8\n" + "5,14," +
+ "comment#9\n" + "6,15,comment#10\n" + "6,16,comment#11\n" + "6,17,comment#12\n" + "6," +
+ "18,comment#13\n" + "6,19,comment#14\n" + "6,20,comment#15\n"
}
-}
-object MapITCase {
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to MapProgs.NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
+ @Test
+ def testMapperIfUDFReturnsInputObjectIncrementFirstFieldOfTuple: Unit = {
+ /*
+ * Test mapper if UDF returns input object - increment first field of a tuple
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).map {
+ t => MutableTuple3(t._1, t._2, t._3)
}
+ val inputObjMapDs = ds.map { t => t._1 = t._1 + 1; t }
+ inputObjMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n" + "5,3,Hello world, " +
+ "how are you?\n" + "6,3,I am fine.\n" + "7,3,Luke Skywalker\n" + "8,4," +
+ "Comment#1\n" + "9,4,Comment#2\n" + "10,4,Comment#3\n" + "11,4,Comment#4\n" + "12,5," +
+ "Comment#5\n" + "13,5,Comment#6\n" + "14,5,Comment#7\n" + "15,5,Comment#8\n" + "16,5," +
+ "Comment#9\n" + "17,6,Comment#10\n" + "18,6,Comment#11\n" + "19,6,Comment#12\n" + "20," +
+ "6,Comment#13\n" + "21,6,Comment#14\n" + "22,6,Comment#15\n"
+ }
- configs.asJavaCollection
+ @Test
+ def testMapWithBroadcastSet: Unit = {
+ /*
+ * Test map with broadcast set
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ints = CollectionDataSets.getIntDataSet(env)
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val bcMapDs = ds.map(
+ new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
+ var f2Replace = 0
+ override def open(config: Configuration): Unit = {
+ val ints = getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+ f2Replace = ints.sum
+ }
+ override def map(in: (Int, Long, String)): (Int, Long, String) = {
+ in.copy(_1 = f2Replace)
+ }
+ }).withBroadcastSet(ints, "ints")
+ bcMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "55,1,Hi\n" + "55,2,Hello\n" + "55,2,Hello world\n" + "55,3,Hello world, " +
+ "how are you?\n" + "55,3,I am fine.\n" + "55,3,Luke Skywalker\n" + "55,4," +
+ "Comment#1\n" + "55,4,Comment#2\n" + "55,4,Comment#3\n" + "55,4,Comment#4\n" + "55,5," +
+ "Comment#5\n" + "55,5,Comment#6\n" + "55,5,Comment#7\n" + "55,5,Comment#8\n" + "55,5," +
+ "Comment#9\n" + "55,6,Comment#10\n" + "55,6,Comment#11\n" + "55,6,Comment#12\n" + "55," +
+ "6,Comment#13\n" + "55,6,Comment#14\n" + "55,6,Comment#15\n"
}
-}
+ @Test
+ def testPassingConfigurationObject: Unit = {
+ /*
+ * Test passing configuration object.
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getSmall3TupleDataSet(env)
+ val conf = new Configuration
+ val testKey = "testVariable"
+ val testValue = 666
+ conf.setInteger(testKey, testValue)
+ val bcMapDs = ds.map(
+ new RichMapFunction[(Int, Long, String), (Int, Long, String)] {
+ override def open(config: Configuration): Unit = {
+ val fromConfig = config.getInteger(testKey, -1)
+ Assert.assertEquals(testValue, fromConfig)
+ }
+ override def map(in: (Int, Long, String)): (Int, Long, String) = {
+ in
+ }
+ }).withParameters(conf)
+ bcMapDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 5d80830..35c0e93 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -20,199 +20,183 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.flink.api.scala._
+@RunWith(classOf[Parameterized])
+class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = null
+ private val _tempFolder = new TemporaryFolder()
-object PartitionProgs {
- var NUM_PROGRAMS: Int = 7
-
- def runProgram(progId: Int, resultPath: String, onCollection: Boolean): String = {
- progId match {
- case 1 =>
- /*
- * Test hash partition by tuple field
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
+ @Rule
+ def tempFolder = _tempFolder
- val unique = ds.partitionByHash(1).mapPartition( _.map(_._2).toSet )
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
- unique.writeAsText(resultPath)
- env.execute()
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
- "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+ @Test
+ def testHashPartitionByTupleField: Unit = {
+ /*
+ * Test hash partition by tuple field
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
- case 2 =>
- /*
- * Test hash partition by key selector
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val unique = ds.partitionByHash( _._2 ).mapPartition( _.map(_._2).toSet )
+ val unique = ds.partitionByHash(1).mapPartition( _.map(_._2).toSet )
- unique.writeAsText(resultPath)
- env.execute()
- "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+ unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
- case 3 =>
- /*
- * Test forced rebalancing
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.generateSequence(1, 3000)
+ expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+ }
- val skewed = ds.filter(_ > 780)
- val rebalanced = skewed.rebalance()
+ @Test
+ def testHashPartitionByKeySelector: Unit = {
+ /*
+ * Test hash partition by key selector
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val unique = ds.partitionByHash( _._2 ).mapPartition( _.map(_._2).toSet )
+
+ unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+ }
- val countsInPartition = rebalanced.map( new RichMapFunction[Long, (Int, Long)] {
- def map(in: Long) = {
- (getRuntimeContext.getIndexOfThisSubtask, 1)
- }
- })
- .groupBy(0)
- .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
- // round counts to mitigate runtime scheduling effects (lazy split assignment)
- .map { in => (in._1, in._2 / 10) }
-
- countsInPartition.writeAsText(resultPath)
- env.execute()
-
- val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10
- var result = ""
- for (i <- 0 until env.getDegreeOfParallelism) {
- result += "(" + i + "," + numPerPartition + ")\n"
- }
- result
-
- case 4 =>
- // Verify that mapPartition operation after repartition picks up correct
- // DOP
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- env.setDegreeOfParallelism(1)
-
- val unique = ds.partitionByHash(1)
- .setParallelism(4)
- .mapPartition( _.map(_._2).toSet )
-
- unique.writeAsText(resultPath)
- env.execute()
-
- "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-
- case 5 =>
- // Verify that map operation after repartition picks up correct
- // DOP
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- env.setDegreeOfParallelism(1)
-
- val count = ds.partitionByHash(0).setParallelism(4).map(
- new RichMapFunction[(Int, Long, String), Tuple1[Int]] {
- var first = true
- override def map(in: (Int, Long, String)): Tuple1[Int] = {
- // only output one value with count 1
- if (first) {
- first = false
- Tuple1(1)
- } else {
- Tuple1(0)
- }
- }
- }).sum(0)
-
- count.writeAsText(resultPath)
- env.execute()
-
- if (onCollection) "(1)\n" else "(4)\n"
-
- case 6 =>
- // Verify that filter operation after repartition picks up correct
- // DOP
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- env.setDegreeOfParallelism(1)
-
- val count = ds.partitionByHash(0).setParallelism(4).filter(
- new RichFilterFunction[(Int, Long, String)] {
- var first = true
- override def filter(in: (Int, Long, String)): Boolean = {
- // only output one value with count 1
- if (first) {
- first = false
- true
- } else {
- false
- }
- }
- })
- .map( _ => Tuple1(1)).sum(0)
-
- count.writeAsText(resultPath)
- env.execute()
-
- if (onCollection) "(1)\n" else "(4)\n"
-
- case 7 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- env.setDegreeOfParallelism(3)
- val ds = CollectionDataSets.getDuplicatePojoDataSet(env)
- val uniqLongs = ds
- .partitionByHash("nestedPojo.longNumber")
- .setParallelism(4)
- .mapPartition( _.map(_.nestedPojo.longNumber).toSet )
-
- uniqLongs.writeAsText(resultPath)
- env.execute()
- "10000\n" + "20000\n" + "30000\n"
-
- case _ =>
- throw new IllegalArgumentException("Invalid program id")
+ @Test
+ def testForcedRebalancing: Unit = {
+ /*
+ * Test forced rebalancing
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.generateSequence(1, 3000)
+
+ val skewed = ds.filter(_ > 780)
+ val rebalanced = skewed.rebalance()
+
+ val countsInPartition = rebalanced.map( new RichMapFunction[Long, (Int, Long)] {
+ def map(in: Long) = {
+ (getRuntimeContext.getIndexOfThisSubtask, 1)
+ }
+ })
+ .groupBy(0)
+ .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
+ // round counts to mitigate runtime scheduling effects (lazy split assignment)
+ .map { in => (in._1, in._2 / 10) }
+
+ countsInPartition.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+
+ val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10
+ expected = ""
+ for (i <- 0 until env.getDegreeOfParallelism) {
+ expected += "(" + i + "," + numPerPartition + ")\n"
}
}
-}
+ @Test
+ def testMapPartitionAfterRepartitionHasCorrectDOP: Unit = {
+ // Verify that mapPartition operation after repartition picks up correct
+ // DOP
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ env.setDegreeOfParallelism(1)
-@RunWith(classOf[Parameterized])
-class PartitionITCase(config: Configuration) extends JavaProgramTestBase(config) {
+ val unique = ds.partitionByHash(1)
+ .setParallelism(4)
+ .mapPartition( _.map(_._2).toSet )
- private var curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
+ unique.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
+ expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
}
- protected def testProgram(): Unit = {
- expectedResult = PartitionProgs.runProgram(curProgId, resultPath, isCollectionExecution)
- }
+ @Test
+ def testMapAfterRepartitionHasCorrectDOP: Unit = {
+ // Verify that map operation after repartition picks up correct
+ // DOP
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ env.setDegreeOfParallelism(1)
+
+ val count = ds.partitionByHash(0).setParallelism(4).map(
+ new RichMapFunction[(Int, Long, String), Tuple1[Int]] {
+ var first = true
+ override def map(in: (Int, Long, String)): Tuple1[Int] = {
+ // only output one value with count 1
+ if (first) {
+ first = false
+ Tuple1(1)
+ } else {
+ Tuple1(0)
+ }
+ }
+ }).sum(0)
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
+ count.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+
+ expected = if (mode == ExecutionMode.COLLECTION) "(1)\n" else "(4)\n"
}
-}
-object PartitionITCase {
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to PartitionProgs.NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
- }
+ @Test
+ def testFilterAfterRepartitionHasCorrectDOP: Unit = {
+ // Verify that filter operation after repartition picks up correct
+ // DOP
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ env.setDegreeOfParallelism(1)
+
+ val count = ds.partitionByHash(0).setParallelism(4).filter(
+ new RichFilterFunction[(Int, Long, String)] {
+ var first = true
+ override def filter(in: (Int, Long, String)): Boolean = {
+ // only output one value with count 1
+ if (first) {
+ first = false
+ true
+ } else {
+ false
+ }
+ }
+ })
+ .map( _ => Tuple1(1)).sum(0)
+
+ count.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
- configs.asJavaCollection
+ expected = if (mode == ExecutionMode.COLLECTION) "(1)\n" else "(4)\n"
}
-}
+ @Test
+ def testPartitionNestedPojo: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ env.setDegreeOfParallelism(3)
+ val ds = CollectionDataSets.getDuplicatePojoDataSet(env)
+ val uniqLongs = ds
+ .partitionByHash("nestedPojo.longNumber")
+ .setParallelism(4)
+ .mapPartition( _.map(_.nestedPojo.longNumber).toSet )
+
+ uniqLongs.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "10000\n" + "20000\n" + "30000\n"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index 6f8af7e..5f63dc4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -21,216 +21,212 @@ import org.apache.flink.api.common.functions.RichReduceFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.flink.api.scala._
+@RunWith(classOf[Parameterized])
+class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = null
+ private val _tempFolder = new TemporaryFolder()
-object ReduceProgs {
- var NUM_PROGRAMS: Int = 10
-
- def runProgram(progId: Int, resultPath: String): String = {
- progId match {
- case 1 =>
- /*
- * Reduce on tuples with key field selector
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val reduceDs = ds.groupBy(1)
- .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
- reduceDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
-
- case 2 =>
- /*
- * Reduce on tuples with multiple key field selectors
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get5TupleDataSet(env)
- val reduceDs = ds.groupBy(4, 0)
- .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
- reduceDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
- "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
- "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
- case 3 =>
- /*
- * Reduce on tuples with key extractor
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val reduceDs = ds.groupBy(_._2)
- .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
- reduceDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
-
- case 4 =>
- /*
- * Reduce on custom type with key extractor
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val reduceDs = ds.groupBy(_.myInt)
- .reduce { (in1, in2) =>
- in1.myLong += in2.myLong
- in1.myString = "Hello!"
- in1
- }
- reduceDs.writeAsText(resultPath)
- env.execute()
- "1,0,Hi\n" + "2,3,Hello!\n" + "3,12,Hello!\n" + "4,30,Hello!\n" + "5,60," +
- "Hello!\n" + "6,105,Hello!\n"
-
- case 5 =>
- /*
- * All-reduce for tuple
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val reduceDs =
- ds.reduce { (in1, in2) => (in1._1 + in2._1, in1._2 + in2._2, "Hello World") }
- reduceDs.writeAsCsv(resultPath)
- env.execute()
- "231,91,Hello World\n"
-
- case 6 =>
- /*
- * All-reduce for custom types
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val reduceDs = ds
- .reduce { (in1, in2) =>
- in1.myInt += in2.myInt
- in1.myLong += in2.myLong
- in1.myString = "Hello!"
- in1
- }
- reduceDs.writeAsText(resultPath)
- env.execute()
- "91,210,Hello!"
-
- case 7 =>
- /*
- * Reduce with broadcast set
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val intDs = CollectionDataSets.getIntDataSet(env)
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val reduceDs = ds.groupBy(1).reduce(
- new RichReduceFunction[(Int, Long, String)] {
- private var f2Replace = ""
-
- override def open(config: Configuration) {
- val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
- f2Replace = ints.sum + ""
- }
-
- override def reduce(
- in1: (Int, Long, String),
- in2: (Int, Long, String)): (Int, Long, String) = {
- (in1._1 + in2._1, in1._2, f2Replace)
- }
- }).withBroadcastSet(intDs, "ints")
- reduceDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hi\n" + "5,2,55\n" + "15,3,55\n" + "34,4,55\n" + "65,5,55\n" + "111,6,55\n"
-
- case 8 =>
- /*
- * Reduce with UDF that returns the second input object (check mutable object handling)
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).map (t => MutableTuple3(t._1, t._2, t._3))
- val reduceDs = ds.groupBy(1).reduce(
- new RichReduceFunction[MutableTuple3[Int, Long, String]] {
- override def reduce(
- in1: MutableTuple3[Int, Long, String],
- in2: MutableTuple3[Int, Long, String]): MutableTuple3[Int, Long, String] = {
- in2._1 = in1._1 + in2._1
- in2._3 = "Hi again!"
- in2
- }
- })
- reduceDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,Hi\n" + "5,2,Hi again!\n" + "15,3,Hi again!\n" + "34,4,Hi again!\n" + "65,5," +
- "Hi again!\n" + "111,6,Hi again!\n"
-
- case 9 =>
- /*
- * Reduce with a Tuple-returning KeySelector
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get5TupleDataSet(env)
- val reduceDs = ds.groupBy(t => (t._1, t._5))
- .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
- reduceDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
- "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
- "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
- case 10 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get5TupleDataSet(env)
- val reduceDs = ds.groupBy("_5", "_1")
- .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
- reduceDs.writeAsCsv(resultPath)
- env.execute()
- "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
- "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
- "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
-
- case id =>
- throw new IllegalArgumentException(s"Invalid program id $id")
- }
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
}
-}
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
-@RunWith(classOf[Parameterized])
-class ReduceITCase(config: Configuration) extends JavaProgramTestBase(config) {
+ @Test
+ def testReduceOnTuplesWithKeyFieldSelector: Unit = {
+ /*
+ * Reduce on tuples with key field selector
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val reduceDs = ds.groupBy(1)
+ .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
+ reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
+ }
- private var curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
+ @Test
+ def testReduceOnTuplesWithMultipleKeyFieldSelectors: Unit = {
+ /*
+ * Reduce on tuples with multiple key field selectors
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get5TupleDataSet(env)
+ val reduceDs = ds.groupBy(4, 0)
+ .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
+ reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
+ "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
+ "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
+ }
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
+ @Test
+ def testReduceOnTuplesWithKeyExtractor: Unit = {
+ /*
+ * Reduce on tuples with key extractor
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val reduceDs = ds.groupBy(_._2)
+ .reduce { (in1, in2) => (in1._1 + in2._1, in1._2, "B-)") }
+ reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "5,2,B-)\n" + "15,3,B-)\n" + "34,4,B-)\n" + "65,5,B-)\n" + "111,6,B-)\n"
}
- protected def testProgram(): Unit = {
- expectedResult = ReduceProgs.runProgram(curProgId, resultPath)
+ @Test
+ def testReduceOnCustomTypeWithKeyExtractor: Unit = {
+ /*
+ * Reduce on custom type with key extractor
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getCustomTypeDataSet(env)
+ val reduceDs = ds.groupBy(_.myInt)
+ .reduce { (in1, in2) =>
+ in1.myLong += in2.myLong
+ in1.myString = "Hello!"
+ in1
+ }
+ reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,0,Hi\n" + "2,3,Hello!\n" + "3,12,Hello!\n" + "4,30,Hello!\n" + "5,60," +
+ "Hello!\n" + "6,105,Hello!\n"
}
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
+ @Test
+ def testAllReduceForTuple: Unit = {
+ /*
+ * All-reduce for tuple
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val reduceDs =
+ ds.reduce { (in1, in2) => (in1._1 + in2._1, in1._2 + in2._2, "Hello World") }
+ reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "231,91,Hello World\n"
}
-}
-object ReduceITCase {
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to ReduceProgs.NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
+ @Test
+ def testAllReduceForCustomTypes: Unit = {
+ /*
+ * All-reduce for custom types
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getCustomTypeDataSet(env)
+ val reduceDs = ds
+ .reduce { (in1, in2) =>
+ in1.myInt += in2.myInt
+ in1.myLong += in2.myLong
+ in1.myString = "Hello!"
+ in1
}
+ reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "91,210,Hello!"
+ }
+
+ @Test
+ def testReduceWithBroadcastSet: Unit = {
+ /*
+ * Reduce with broadcast set
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val intDs = CollectionDataSets.getIntDataSet(env)
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val reduceDs = ds.groupBy(1).reduce(
+ new RichReduceFunction[(Int, Long, String)] {
+ private var f2Replace = ""
+
+ override def open(config: Configuration) {
+ val ints = this.getRuntimeContext.getBroadcastVariable[Int]("ints").asScala
+ f2Replace = ints.sum + ""
+ }
- configs.asJavaCollection
+ override def reduce(
+ in1: (Int, Long, String),
+ in2: (Int, Long, String)): (Int, Long, String) = {
+ (in1._1 + in2._1, in1._2, f2Replace)
+ }
+ }).withBroadcastSet(intDs, "ints")
+ reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "5,2,55\n" + "15,3,55\n" + "34,4,55\n" + "65,5,55\n" + "111,6,55\n"
}
-}
+ @Test
+ def testReduceWithUDFThatReturnsTheSecondInputObject: Unit = {
+ /*
+ * Reduce with UDF that returns the second input object (check mutable object handling)
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).map (t => MutableTuple3(t._1, t._2, t._3))
+ val reduceDs = ds.groupBy(1).reduce(
+ new RichReduceFunction[MutableTuple3[Int, Long, String]] {
+ override def reduce(
+ in1: MutableTuple3[Int, Long, String],
+ in2: MutableTuple3[Int, Long, String]): MutableTuple3[Int, Long,
+ String] = {
+ in2._1 = in1._1 + in2._1
+ in2._3 = "Hi again!"
+ in2
+ }
+ })
+ reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "5,2,Hi again!\n" + "15,3,Hi again!\n" + "34,4,Hi again!\n" + "65,5," +
+ "Hi again!\n" + "111,6,Hi again!\n"
+ }
+
+ @Test
+ def testReduceWithATupleReturningKeySelector: Unit = {
+ /*
+ * Reduce with a Tuple-returning KeySelector
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get5TupleDataSet(env)
+ val reduceDs = ds.groupBy(t => (t._1, t._5))
+ .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
+ reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
+ "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
+ "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
+ }
+
+ @Test
+ def testReduceOnGroupedDSByExpressionKey: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get5TupleDataSet(env)
+ val reduceDs = ds.groupBy("_5", "_1")
+ .reduce { (in1, in2) => (in1._1, in1._2 + in2._2, 0, "P-)", in1._5) }
+ reduceDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,0,Hallo,1\n" + "2,3,2,Hallo Welt wie,1\n" + "2,2,1,Hallo Welt,2\n" + "3,9,0," +
+ "P-),2\n" + "3,6,5,BCD,3\n" + "4,17,0,P-),1\n" + "4,17,0,P-),2\n" + "5,11,10,GHI," +
+ "1\n" + "5,29,0,P-),2\n" + "5,25,0,P-),3\n"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index 128135e..5e456e0 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -18,126 +18,96 @@
package org.apache.flink.api.scala.operators
-import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit.{Test, After, Before, Rule}
+import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.flink.api.scala._
-/**
- * These tests are copied from [[AggregateITCase]] replacing calls to aggregate with calls to sum,
- * min, and max
- */
-object SumMinMaxProgs {
- var NUM_PROGRAMS: Int = 3
-
- def runProgram(progId: Int, resultPath: String): String = {
- progId match {
- case 1 =>
- // Full aggregate
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
-
- val aggregateDs = ds
- .sum(0)
- .andMax(1)
- // Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
- .map{ t => (t._1, t._2) }
-
- aggregateDs.writeAsCsv(resultPath)
- env.execute()
+@RunWith(classOf[Parameterized])
+class SumMinMaxITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = null
+ private val _tempFolder = new TemporaryFolder()
- // return expected result
- "231,6\n"
+ @Rule
+ def tempFolder = _tempFolder
- case 2 =>
- // Grouped aggregate
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
- val aggregateDs = ds
- .groupBy(1)
- .sum(0)
- // Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
- .map { t => (t._2, t._1) }
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
- aggregateDs.writeAsCsv(resultPath)
+ @Test
+ def testFullAggregate: Unit = {
+ // Full aggregate
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
- env.execute()
+ val aggregateDs = ds
+ .sum(0)
+ .andMax(1)
+ // Ensure aggregate operator correctly copies other fields
+ .filter(_._3 != null)
+ .map{ t => (t._1, t._2) }
- // return expected result
- "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+ aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- case 3 =>
- // Nested aggregate
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
+ env.execute()
- val aggregateDs = ds
- .groupBy(1)
- .min(0)
- .min(0)
- // Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
- .map { t => new Tuple1(t._1) }
+ expected = "231,6\n"
+ }
- aggregateDs.writeAsCsv(resultPath)
+ @Test
+ def testGroupedAggregate: Unit = {
+ // Grouped aggregate
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
- env.execute()
+ val aggregateDs = ds
+ .groupBy(1)
+ .sum(0)
+ // Ensure aggregate operator correctly copies other fields
+ .filter(_._3 != null)
+ .map { t => (t._2, t._1) }
- // return expected result
- "1\n"
+ aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
- case _ =>
- throw new IllegalArgumentException("Invalid program id")
- }
+ expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
}
-}
-
-@RunWith(classOf[Parameterized])
-class SumMinMaxITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
- private var curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
+ @Test
+ def testNestedAggregate: Unit = {
+ // Nested aggregate
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
- }
+ val aggregateDs = ds
+ .groupBy(1)
+ .min(0)
+ .min(0)
+ // Ensure aggregate operator correctly copies other fields
+ .filter(_._3 != null)
+ .map { t => new Tuple1(t._1) }
- protected def testProgram(): Unit = {
- expectedResult = SumMinMaxProgs.runProgram(curProgId, resultPath)
- }
+ aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-}
+ env.execute()
-object SumMinMaxITCase {
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to SumMinMaxProgs.NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
- }
-
- configs.asJavaCollection
+ expected = "1\n"
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index 5304ffa..3eed128 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -18,22 +18,21 @@
package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.Assert
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit._
+import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
import org.apache.flink.api.scala._
-
-object UnionProgs {
- var NUM_PROGRAMS: Int = 3
+@RunWith(classOf[Parameterized])
+class UnionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = null
+ private val _tempFolder = new TemporaryFolder()
private final val FULL_TUPLE_3_STRING: String = "1,1,Hi\n" + "2,2,Hello\n" + "3,2," +
"Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3," +
@@ -42,85 +41,61 @@ object UnionProgs {
"Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6," +
"Comment#12\n" + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- def runProgram(progId: Int, resultPath: String): String = {
- progId match {
- case 1 =>
- /*
- * Union of 2 Same Data Sets
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
- unionDs.writeAsCsv(resultPath)
- env.execute()
- FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
+ @Rule
+ def tempFolder = _tempFolder
- case 2 =>
- /*
- * Union of 5 same Data Sets, with multiple unions
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val unionDs = ds
- .union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env))
- unionDs.writeAsCsv(resultPath)
- env.execute()
- FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
- FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
-
- case 3 =>
- /*
- * Test on union with empty dataset
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- // Don't know how to make an empty result in an other way than filtering it
- val empty = CollectionDataSets.get3TupleDataSet(env).filter( t => false )
- val unionDs = CollectionDataSets.get3TupleDataSet(env).union(empty)
- unionDs.writeAsCsv(resultPath)
- env.execute()
- FULL_TUPLE_3_STRING
-
- case _ =>
- throw new IllegalArgumentException("Invalid program id")
- }
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
}
-}
-
-@RunWith(classOf[Parameterized])
-class UnionITCase(config: Configuration) extends JavaProgramTestBase(config) {
-
- private var curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
-
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
}
- protected def testProgram(): Unit = {
- expectedResult = UnionProgs.runProgram(curProgId, resultPath)
+ @Test
+ def testUnionOf2IdenticalDS: Unit = {
+ /*
+ * Union of 2 Same Data Sets
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
+ unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
}
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
+ @Test
+ def testUnionOf5IdenticalDSWithMultipleUnions: Unit = {
+ /*
+ * Union of 5 same Data Sets, with multiple unions
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val unionDs = ds
+ .union(CollectionDataSets.get3TupleDataSet(env))
+ .union(CollectionDataSets.get3TupleDataSet(env))
+ .union(CollectionDataSets.get3TupleDataSet(env))
+ .union(CollectionDataSets.get3TupleDataSet(env))
+ unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
+ FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
}
-}
-
-object UnionITCase {
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to UnionProgs.NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
- }
- configs.asJavaCollection
+ @Test
+ def testUnionWithEmptyDS: Unit = {
+ /*
+ * Test on union with empty dataset
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ // Don't know how to make an empty result in an other way than filtering it
+ val empty = CollectionDataSets.get3TupleDataSet(env).filter( t => false )
+ val unionDs = CollectionDataSets.get3TupleDataSet(env).union(empty)
+ unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = FULL_TUPLE_3_STRING
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 4024304..104a440 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -17,161 +17,152 @@
*/
package org.apache.flink.api.scala.runtime
-import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.test.util.JavaProgramTestBase
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.ExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase}
+import org.junit._
+import org.junit.rules.TemporaryFolder
import org.apache.flink.api.scala._
-
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
@RunWith(classOf[Parameterized])
-class ScalaSpecialTypesITCase(config: Configuration) extends JavaProgramTestBase(config) {
+class ScalaSpecialTypesITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var curProgId: Int = config.getInteger("ProgramId", -1)
- private var resultPath: String = null
- private var expectedResult: String = null
+ val _tempFolder = new TemporaryFolder()
- protected override def preSubmit(): Unit = {
- resultPath = getTempDirPath("result")
- }
+ @Rule def tempFolder = _tempFolder
- protected def testProgram(): Unit = {
- expectedResult = curProgId match {
- case 1 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- val nums = env.fromElements(1, 2, 1, 2)
+ @Test
+ def testEither1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val nums = env.fromElements(1, 2, 1, 2)
- val eithers = nums.map(_ match {
- case 1 => Left(10)
- case 2 => Right(20)
- })
+ val eithers = nums.map(_ match {
+ case 1 => Left(10)
+ case 2 => Right(20)
+ })
- val result = eithers.map(_ match {
- case Left(i) => i
- case Right(i) => i
- }).reduce(_ + _).writeAsText(resultPath)
+ val resultPath = tempFolder.newFile().toPath.toUri.toString
- env.execute()
+ val result = eithers.map{
+ _ match {
+ case Left(i) => i
+ case Right(i) => i
+ }}.reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
- "60"
+ env.execute()
- case 2 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- val nums = env.fromElements(1, 2, 1, 2)
+ compareResultsByLinesInMemory("60", resultPath)
+ }
- val eithers = nums.map(_ match {
- case 1 => Left(10)
- case 2 => Left(20)
- })
+ @Test
+ def testEither2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val nums = env.fromElements(1, 2, 1, 2)
- val result = eithers.map(_ match {
- case Left(i) => i
- }).reduce(_ + _).writeAsText(resultPath)
+ val eithers = nums.map(_ match {
+ case 1 => Left(10)
+ case 2 => Left(20)
+ })
- env.execute()
+ val resultPath = tempFolder.newFile().toPath.toUri.toString
- "60"
+ val result = eithers.map(_ match {
+ case Left(i) => i
+ }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
- case 3 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- val nums = env.fromElements(1, 2, 1, 2)
+ env.execute()
- val eithers = nums.map(_ match {
- case 1 => Right(10)
- case 2 => Right(20)
- })
+ compareResultsByLinesInMemory("60", resultPath)
+ }
- val result = eithers.map(_ match {
- case Right(i) => i
- }).reduce(_ + _).writeAsText(resultPath)
+ @Test
+ def testEither3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val nums = env.fromElements(1, 2, 1, 2)
- env.execute()
+ val eithers = nums.map(_ match {
+ case 1 => Right(10)
+ case 2 => Right(20)
+ })
- "60"
+ val resultPath = tempFolder.newFile().toPath.toUri.toString
- case 4 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- val nums = env.fromElements(1, 2, 1, 2)
+ val result = eithers.map(_ match {
+ case Right(i) => i
+ }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
- val eithers = nums.map(_ match {
- case 1 => Some(10)
- case 2 => None
- })
+ env.execute()
- val result = eithers.map(_ match {
- case Some(i) => i
- case None => 20
- }).reduce(_ + _).writeAsText(resultPath)
+ compareResultsByLinesInMemory("60", resultPath)
+ }
- env.execute()
+ @Test
+ def testOption1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val nums = env.fromElements(1, 2, 1, 2)
- "60"
+ val eithers = nums.map(_ match {
+ case 1 => Some(10)
+ case 2 => None
+ })
- case 5 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- val nums = env.fromElements(1, 2, 1, 2)
+ val resultPath = tempFolder.newFile().toPath.toUri.toString
- val eithers = nums.map(_ match {
- case 1 => Some(10)
- case 2 => Some(20)
- })
- val result = eithers.map(_ match {
- case Some(i) => i
- }).reduce(_ + _).writeAsText(resultPath)
+ val result = eithers.map(_ match {
+ case Some(i) => i
+ case None => 20
+ }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
+ env.execute()
- "60"
+ compareResultsByLinesInMemory("60", resultPath)
+ }
- case 6 =>
- val env = ExecutionEnvironment.getExecutionEnvironment
- val nums = env.fromElements(1, 2, 1, 2)
+ @Test
+ def testOption2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val nums = env.fromElements(1, 2, 1, 2)
- val eithers = nums.map(_ match {
- case 1 => None
- case 2 => None
- })
+ val eithers = nums.map(_ match {
+ case 1 => Some(10)
+ case 2 => Some(20)
+ })
- val result = eithers.map(_ match {
- case None => 20
- }).reduce(_ + _).writeAsText(resultPath)
+ val resultPath = tempFolder.newFile().toPath.toUri.toString
- env.execute()
+ val result = eithers.map(_ match {
+ case Some(i) => i
+ }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
- "80"
+ env.execute()
- case _ =>
- throw new IllegalArgumentException("Invalid program id")
- }
+ compareResultsByLinesInMemory("60", resultPath)
}
- protected override def postSubmit(): Unit = {
- compareResultsByLinesInMemory(expectedResult, resultPath)
- }
-}
+ @Test
+ def testOption3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val nums = env.fromElements(1, 2, 1, 2)
+
+ val eithers = nums.map(_ match {
+ case 1 => None
+ case 2 => None
+ })
+
+ val resultPath = tempFolder.newFile().toPath.toUri.toString
-object ScalaSpecialTypesITCase {
- var NUM_PROGRAMS: Int = 6
+ val result = eithers.map(_ match {
+ case None => 20
+ }).reduce(_ + _).writeAsText(resultPath, WriteMode.OVERWRITE)
- @Parameters
- def getConfigurations: java.util.Collection[Array[AnyRef]] = {
- val configs = mutable.MutableList[Array[AnyRef]]()
- for (i <- 1 to ScalaSpecialTypesITCase.NUM_PROGRAMS) {
- val config = new Configuration()
- config.setInteger("ProgramId", i)
- configs += Array(config)
- }
+ env.execute()
- configs.asJavaCollection
+ compareResultsByLinesInMemory("80", resultPath)
}
}