You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/13 15:44:05 UTC

[1/2] flink git commit: [FLINK-1595] [streaming] Added complex streaming integration test

Repository: flink
Updated Branches:
  refs/heads/master 73493335f -> 48e21a1ae


[FLINK-1595] [streaming] Added complex streaming integration test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4786b56e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4786b56e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4786b56e

Branch: refs/heads/master
Commit: 4786b56e588c9812c635e1b4d9c28af68041f754
Parents: 7349333
Author: szape <ne...@gmail.com>
Authored: Tue Apr 21 11:01:08 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed May 13 09:51:43 2015 +0200

----------------------------------------------------------------------
 .../SlidingTimeGroupedPreReducer.java           |   8 +-
 .../windowbuffer/SlidingTimePreReducer.java     |   8 +-
 .../api/complex/ComplexIntegrationTest.java     | 749 +++++++++++++++++++
 .../flink/streaming/util/RectangleClass.java    |  43 ++
 .../util/StreamingMultipleProgramsTestBase.java |  40 +
 5 files changed, 842 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
index 3724ce5..cdb4207 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
@@ -69,9 +69,11 @@ public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T>
 
 	@Override
 	protected void afterEmit() {
-		long lastTime = timestampWrapper.getTimestamp(lastStored);
-		if (lastTime - windowStartTime >= slideSize) {
-			windowStartTime = windowStartTime + slideSize;
+		if (lastStored != null) {
+			long lastTime = timestampWrapper.getTimestamp(lastStored);
+			if (lastTime - windowStartTime >= slideSize) {
+				windowStartTime = windowStartTime + slideSize;
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
index 2382b48..7652d81 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -68,9 +68,11 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
 
 	@Override
 	protected void afterEmit() {
-		long lastTime = timestampWrapper.getTimestamp(lastStored);
-		if (lastTime - windowStartTime >= slideSize) {
-			windowStartTime = windowStartTime + slideSize;
+		if (lastStored != null) {
+			long lastTime = timestampWrapper.getTimestamp(lastStored);
+			if (lastTime - windowStartTime >= slideSize) {
+				windowStartTime = windowStartTime + slideSize;
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
new file mode 100644
index 0000000..74b5f1d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -0,0 +1,749 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.complex;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.RectangleClass;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private String resultPath1;
+	private String resultPath2;
+	private String expected1;
+	private String expected2;
+
+	public ComplexIntegrationTest(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception {
+		resultPath1 = tempFolder.newFile().toURI().toString();
+		resultPath2 = tempFolder.newFile().toURI().toString();
+		expected1 = "";
+		expected2 = "";
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected1, resultPath1);
+		compareResultsByLinesInMemory(expected2, resultPath2);
+	}
+
+	@Test
+	public void complexIntegrationTest1() throws Exception {
+		//Testing data stream splitting with tuples
+
+		expected1 = "";
+		for (int i = 0; i < 8; i++) {
+			expected1 += "(10,(a,1))\n";
+		}
+		//i == 8
+		expected1 += "(10,(a,1))";
+
+		expected2 = "";
+		for (int i = 0; i < 18; i++) {
+			expected2 += "(20,(a,1))\n";
+		}
+		//i == 18
+		expected2 += "(20,(a,1))";
+
+		//We create a separate environment for this test because of the slot-related to iteration issues.
+		StreamExecutionEnvironment env = new TestStreamEnvironment(4, 32); //StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+		DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
+
+		IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.sum(0).setParallelism(1).filter(new FilterFunction
+				<Tuple2<Long, Tuple2<String, Long>>>() {
+
+			@Override
+			public boolean filter(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
+				return value.f0 < 20;
+			}
+		}).iterate(5000);
+
+		SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new
+				MyOutputSelector());
+		it.closeWith(step.select("iterate"));
+
+		step.select("firstOutput")
+				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+		step.select("secondOutput")//.print();
+				.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void complexIntegrationTest2() throws Exception {
+		//Testing POJO source, grouping by multiple filds and windowing with timestamp
+
+		expected1 = "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
+				"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
+				"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
+				"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
+				"water_melon-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" +
+				"orange-b\n" + "orange-c\n" + "orange-c\n" + "orange-c\n" + "orange-c\n" + "orange-d\n" + "orange-d\n" +
+				"peach-d\n" + "peach-d\n";
+
+		List<Tuple5<Integer, String, Character, Double, Boolean>> input = Arrays.asList(
+				new Tuple5<Integer, String, Character, Double, Boolean>(1, "apple", 'j', 0.1, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(1, "peach", 'b', 0.8, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(1, "orange", 'c', 0.7, true),
+				new Tuple5<Integer, String, Character, Double, Boolean>(2, "apple", 'd', 0.5, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(2, "peach", 'j', 0.6, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(3, "orange", 'b', 0.2, true),
+				new Tuple5<Integer, String, Character, Double, Boolean>(6, "apple", 'c', 0.1, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(7, "peach", 'd', 0.4, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(8, "orange", 'j', 0.2, true),
+				new Tuple5<Integer, String, Character, Double, Boolean>(10, "apple", 'b', 0.1, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(10, "peach", 'c', 0.5, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(11, "orange", 'd', 0.3, true),
+				new Tuple5<Integer, String, Character, Double, Boolean>(11, "apple", 'j', 0.3, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(12, "peach", 'b', 0.9, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(13, "orange", 'c', 0.7, true),
+				new Tuple5<Integer, String, Character, Double, Boolean>(15, "apple", 'd', 0.2, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(16, "peach", 'j', 0.8, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(16, "orange", 'b', 0.8, true),
+				new Tuple5<Integer, String, Character, Double, Boolean>(16, "apple", 'c', 0.1, false),
+				new Tuple5<Integer, String, Character, Double, Boolean>(17, "peach", 'd', 1.0, true));
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setBufferTimeout(100);
+		env.setParallelism(4);
+
+		SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
+		DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
+
+		sourceStream21
+				.sum(3)
+				.groupBy(2, 2)
+				.window(Time.of(10, new MyTimestamp(), 0))
+				.every(Time.of(4, new MyTimestamp(), 0))
+				.maxBy(3)
+				.flatten()
+				.map(new MyMapFunction2())
+				.flatMap(new MyFlatMapFunction())
+				.connect(sourceStream22)
+				.map(new MyCoMapFunction())
+				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void complexIntegrationTest3() throws Exception {
+		//Heavy prime factorisation with maps and flatmaps
+
+		expected1 = "541\n" + "1223\n" + "3319\n" + "5851\n" + "1987\n" + "8387\n" + "15907\n" + "10939\n" +
+				"4127\n" + "2477\n" + "6737\n" + "13421\n" + "4987\n" + "4999\n" + "18451\n" + "9283\n" + "7499\n" +
+				"16937\n" + "11927\n" + "9973\n" + "14431\n" + "19507\n" + "12497\n" + "17497\n" + "14983\n" +
+				"19997\n";
+
+		for (int i = 2; i < 100; i++) {
+			expected2 += "(" + i + "," + 20000 / i + ")\n";
+		}
+		for (int i = 19901; i < 20000; i++) {
+			expected2 += "(" + i + "," + 20000 / i + ")\n";
+		}
+		//i == 20000
+		expected2 += "(" + 20000 + "," + 1 + ")";
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
+		DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
+
+		sourceStream31.filter(new PrimeFilterFunction())
+				.window(Count.of(100))
+				.max(0).flatten()
+				.merge(sourceStream32.filter(new PrimeFilterFunction())
+						.window(Count.of(100))
+						.max(0).flatten())
+				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		sourceStream31.flatMap(new DivisorsFlatMapFunction())
+				.merge(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
+				Integer>>() {
+
+			@Override
+			public Tuple2<Long, Integer> map(Long value) throws Exception {
+				return new Tuple2<Long, Integer>(value, 1);
+			}
+		})
+//				.groupBy(0)
+//				.sum(1)
+				.groupBy(0)
+				.window(Count.of(10000)).sum(1).flatten()
+				.filter(new FilterFunction<Tuple2<Long, Integer>>() {
+
+					@Override
+					public boolean filter(Tuple2<Long, Integer> value) throws Exception {
+						return value.f0 < 100 || value.f0 > 19900;
+					}
+				})
+				.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+	}
+
+	@Test
+	public void complexIntegrationTest4() throws Exception {
+		//Testing mapping and delta-policy windowing with custom class
+
+		expected1 = "((100,100),0)\n" + "((120,122),5)\n" + "((121,125),6)\n" + "((138,144),9)\n" +
+				"((139,147),10)\n" + "((156,166),13)\n" + "((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" +
+				"((192,210),21)\n" + "((193,213),22)\n" + "((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" +
+				"((229,257),30)\n" + "((246,276),33)\n" + "((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" +
+				"((282,320),41)\n" + "((283,323),42)\n" + "((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" +
+				"((319,367),50)\n" + "((336,386),53)\n" + "((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" +
+				"((372,430),61)\n" + "((373,433),62)\n" + "((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" +
+				"((409,477),70)\n" + "((426,496),73)\n" + "((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" +
+				"((462,540),81)\n" + "((463,543),82)\n" + "((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" +
+				"((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		env.addSource(new RectangleSource())
+				.global()
+				.map(new RectangleMapFunction())
+				.window(Delta.of(0.0, new MyDelta(), new Tuple2<RectangleClass, Integer>(new RectangleClass(100, 100), 0)))
+				.mapWindow(new MyWindowMapFunction())
+				.flatten()
+				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+	}
+
+	private static class MyDelta implements DeltaFunction<Tuple2<RectangleClass, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public double getDelta(Tuple2<RectangleClass, Integer> oldDataPoint, Tuple2<RectangleClass,
+				Integer> newDataPoint) {
+			return (newDataPoint.f0.b - newDataPoint.f0.a) - (oldDataPoint.f0.b - oldDataPoint.f0.a);
+		}
+	}
+
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void complexIntegrationTest5() throws Exception {
+		//Turning on and off chaining
+
+		expected1 = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" + "5\n" + "5\n" +
+				"5\n" + "5\n" + "5\n" + "1\n" + "3\n" + "3\n" + "4\n" + "5\n" + "5\n" + "6\n" + "8\n" + "9\n" + "10\n" +
+				"12\n" + "15\n" + "16\n" + "20\n" + "25\n";
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		DataStream<Long> dataStream51 = env.generateSequence(1, 5)
+				.map(new MapFunction<Long, Long>() {
+
+					@Override
+					public Long map(Long value) throws Exception {
+						return value;
+					}
+				}).startNewChain()
+				.filter(new FilterFunction<Long>() {
+
+					@Override
+					public boolean filter(Long value) throws Exception {
+						return true;
+					}
+				}).disableChaining()
+				.flatMap(new SquareFlatMapFunction());
+
+		DataStream<Long> dataStream52 = dataStream51.fold(0L, new FoldFunction<Long, Long>() {
+
+			@Override
+			public Long fold(Long accumulator, Long value) throws Exception {
+				return accumulator + value;
+			}
+		}).map(new MapFunction<Long, Long>() {
+
+			@Override
+			public Long map(Long value) throws Exception {
+				return value;
+			}
+		}).disableChaining();
+
+		DataStream<Long> dataStream53 = dataStream51.map(new MapFunction<Long, Long>() {
+
+			@Override
+			public Long map(Long value) throws Exception {
+				return value;
+			}
+		});
+
+
+		dataStream53.merge(dataStream52)//.print();
+				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+	}
+
+
+	@Test
+	public void complexIntegrationTest6() throws Exception {
+		//Testing java collections and date-time types
+
+		expected1 = "(6,(a,6))\n" + "(6,(b,3))\n" + "(6,(c,4))\n" + "(6,(d,2))\n" + "(6,(f,2))\n" +
+				"(7,(a,1))\n" + "(7,(b,2))\n" + "(7,(c,3))\n" + "(7,(d,1))\n" + "(7,(e,1))\n" + "(7,(f,1))\n" +
+				"(8,(a,6))\n" + "(8,(b,4))\n" + "(8,(c,5))\n" + "(8,(d,1))\n" + "(8,(e,2))\n" + "(8,(f,2))\n" +
+				"(9,(a,4))\n" + "(9,(b,4))\n" + "(9,(c,7))\n" + "(9,(d,3))\n" + "(9,(e,1))\n" + "(9,(f,2))\n" +
+				"(10,(a,3))\n" + "(10,(b,2))\n" + "(10,(c,3))\n" + "(10,(d,2))\n" + "(10,(e,1))\n" + "(10,(f,1))";
+		expected2 = "[a, a, c, c, d, f]\n" + "[a, b, b, d]\n" + "[a, a, a, b, c, c, f]\n" + "[a, d, e]\n" +
+				"[b, b, c, c, c, f]\n" + "[a, a, a, a, b, b, c, c, e]\n" + "[a, a, b, b, c, c, c, d, e, f, f]\n" +
+				"[a, a, a, b, c, c, c, d, d, f]\n" + "[a, b, b, b, c, c, c, c, d, e, f]\n" +
+				"[a, a, a, b, b, c, c, c, d, d, e, f]";
+
+		SimpleDateFormat ft = new SimpleDateFormat("dd-MM-yyyy");
+
+		ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = new ArrayList<Tuple2<Date, HashMap<Character,
+				Integer>>>();
+		HashMap<Character, Integer> sale1 = new HashMap<Character, Integer>();
+		sale1.put('a', 2);
+		sale1.put('c', 2);
+		sale1.put('d', 1);
+		sale1.put('f', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("03-06-2014"), sale1));
+
+		HashMap<Character, Integer> sale2 = new HashMap<Character, Integer>();
+		sale2.put('a', 1);
+		sale2.put('b', 2);
+		sale2.put('d', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("10-06-2014"), sale2));
+
+		HashMap<Character, Integer> sale3 = new HashMap<Character, Integer>();
+		sale3.put('a', 3);
+		sale3.put('b', 1);
+		sale3.put('c', 2);
+		sale3.put('f', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("29-06-2014"), sale3));
+
+		HashMap<Character, Integer> sale4 = new HashMap<Character, Integer>();
+		sale4.put('a', 1);
+		sale4.put('d', 1);
+		sale4.put('e', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("15-07-2014"), sale4));
+
+		HashMap<Character, Integer> sale5 = new HashMap<Character, Integer>();
+		sale5.put('b', 2);
+		sale5.put('c', 3);
+		sale5.put('f', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("24-07-2014"), sale5));
+
+		HashMap<Character, Integer> sale6 = new HashMap<Character, Integer>();
+		sale6.put('a', 4);
+		sale6.put('b', 2);
+		sale6.put('c', 2);
+		sale6.put('e', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("17-08-2014"), sale6));
+
+		HashMap<Character, Integer> sale7 = new HashMap<Character, Integer>();
+		sale7.put('a', 2);
+		sale7.put('b', 2);
+		sale7.put('c', 3);
+		sale7.put('d', 1);
+		sale7.put('e', 1);
+		sale7.put('f', 2);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("27-08-2014"), sale7));
+
+		HashMap<Character, Integer> sale8 = new HashMap<Character, Integer>();
+		sale8.put('a', 3);
+		sale8.put('b', 1);
+		sale8.put('c', 3);
+		sale8.put('d', 2);
+		sale8.put('f', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("16-09-2014"), sale8));
+
+		HashMap<Character, Integer> sale9 = new HashMap<Character, Integer>();
+		sale9.put('a', 1);
+		sale9.put('b', 3);
+		sale9.put('c', 4);
+		sale9.put('d', 1);
+		sale9.put('e', 1);
+		sale9.put('f', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("25-09-2014"), sale9));
+
+		HashMap<Character, Integer> sale10 = new HashMap<Character, Integer>();
+		sale10.put('a', 3);
+		sale10.put('b', 2);
+		sale10.put('c', 3);
+		sale10.put('d', 2);
+		sale10.put('e', 1);
+		sale10.put('f', 1);
+		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("01-10-2014"), sale10));
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
+		sourceStream6.window(Time.of(1, new Timestamp6()))
+				.reduceWindow(new SalesReduceFunction())
+				.flatten()
+				.flatMap(new FlatMapFunction6())
+				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		sourceStream6.map(new MapFunction6())
+				.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+	}
+
+	private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String,
+			Double, Boolean>> {
+
+		@Override
+		public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double,
+				Boolean> value) throws Exception {
+			return new Tuple4<Integer, String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2,
+					value.f3, value.f4);
+		}
+
+	}
+
+	public static class InnerPojo {
+		public Long f0;
+		public String f1;
+
+		public InnerPojo(Long f0, String f1) {
+			this.f0 = f0;
+			this.f1 = f1;
+		}
+
+		@Override
+		public String toString() {
+			return "POJO(" + f0 + "," + f1 + ")";
+		}
+	}
+
+	public static class OuterPojo {
+		public InnerPojo f0;
+		public Long f1;
+
+		public OuterPojo(InnerPojo f0, Long f1) {
+			this.f0 = f0;
+			this.f1 = f1;
+		}
+
+		@Override
+		public String toString() {
+			return "POJO(" + f0 + "," + f1 + ")";
+		}
+	}
+
+	private static class PojoSource implements SourceFunction<OuterPojo> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void run(Collector<OuterPojo> collector) throws Exception {
+			for (long i = 0; i < 20; i++) {
+				collector.collect(new OuterPojo(new InnerPojo(i / 2, "water_melon-b"), 2L));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// no cleanup needed
+		}
+	}
+
+	private static class TupleSource implements SourceFunction<Tuple2<Long, Tuple2<String, Long>>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void run(Collector<Tuple2<Long, Tuple2<String, Long>>> collector) throws Exception {
+			for (int i = 0; i < 20; i++) {
+				collector.collect(new Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L)));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// no cleanup needed
+		}
+	}
+
+	private class IncrementMap implements MapFunction<Tuple2<Long, Tuple2<String, Long>>, Tuple2<Long, Tuple2<String,
+			Long>>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Long, Tuple2<String, Long>> map(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
+			return new Tuple2<Long, Tuple2<String, Long>>(value.f0 + 1, value.f1);
+		}
+	}
+
+	private static class MyTimestamp implements Timestamp<Tuple5<Integer, String, Character, Double, Boolean>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(Tuple5<Integer, String, Character, Double, Boolean> value) {
+			return (long) value.f0;
+		}
+	}
+
+	private static class MyFlatMapFunction implements FlatMapFunction<Tuple4<Integer, String, Double,
+			Boolean>, OuterPojo> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Tuple4<Integer, String, Double, Boolean> value, Collector<OuterPojo> out) throws
+				Exception {
+			if (value.f3) {
+				for (int i = 0; i < 2; i++) {
+					out.collect(new OuterPojo(new InnerPojo((long) value.f0, value.f1), (long) i));
+				}
+			}
+		}
+	}
+
+	private class MyCoMapFunction implements CoMapFunction<OuterPojo, OuterPojo, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(OuterPojo value) {
+			return value.f0.f1;
+		}
+
+		@Override
+		public String map2(OuterPojo value) {
+			return value.f0.f1;
+		}
+	}
+
+	private class MyOutputSelector implements OutputSelector<Tuple2<Long, Tuple2<String, Long>>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Iterable<String> select(Tuple2<Long, Tuple2<String, Long>> value) {
+			List<String> output = new ArrayList<String>();
+			if (value.f0 == 10) {
+				output.add("iterate");
+				output.add("firstOutput");
+			} else if (value.f0 == 20) {
+				output.add("secondOutput");
+			} else {
+				output.add("iterate");
+			}
+			return output;
+		}
+	}
+
+	private static class PrimeFilterFunction implements FilterFunction<Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Long value) throws Exception {
+			if (value < 2) {
+				return false;
+			} else {
+				for (long i = 2; i < value; i++) {
+					if (value % i == 0) {
+						return false;
+					}
+				}
+			}
+			return true;
+		}
+	}
+
+	private static class DivisorsFlatMapFunction implements FlatMapFunction<Long, Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Long value, Collector<Long> out) throws Exception {
+			for (long i = 2; i <= value; i++) {
+				if (value % i == 0) {
+					out.collect(i);
+				}
+			}
+		}
+	}
+
+	private static class RectangleSource implements SourceFunction<RectangleClass> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void run(Collector<RectangleClass> collector) throws Exception {
+			RectangleClass rectangle = new RectangleClass(100, 100);
+
+			for (int i = 0; i < 100; i++) {
+				collector.collect(rectangle);
+				rectangle = rectangle.next();
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// no cleanup needed
+		}
+	}
+
+	private static class RectangleMapFunction implements MapFunction<RectangleClass, Tuple2<RectangleClass, Integer>> {
+		private static final long serialVersionUID = 1L;
+		private int counter = 0;
+
+		@Override
+		public Tuple2<RectangleClass, Integer> map(RectangleClass value) throws Exception {
+			return new Tuple2<RectangleClass, Integer>(value, counter++);
+		}
+	}
+
+	private static class MyWindowMapFunction implements WindowMapFunction<Tuple2<RectangleClass, Integer>,
+			Tuple2<RectangleClass, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void mapWindow(Iterable<Tuple2<RectangleClass, Integer>> values, Collector<Tuple2<RectangleClass,
+				Integer>> out) throws Exception {
+			out.collect(values.iterator().next());
+		}
+	}
+
+	private static class SquareFlatMapFunction implements FlatMapFunction<Long, Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Long value, Collector<Long> out) throws Exception {
+			for (long i = 0; i < value; i++) {
+				out.collect(value);
+			}
+		}
+	}
+
+	private static class Timestamp6 implements Timestamp<Tuple2<Date, HashMap<Character, Integer>>> {
+
+		@Override
+		public long getTimestamp(Tuple2<Date, HashMap<Character, Integer>> value) {
+			Calendar cal = Calendar.getInstance();
+			cal.setTime(value.f0);
+			return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH);
+		}
+	}
+
+	private static class SalesReduceFunction implements ReduceFunction<Tuple2<Date, HashMap<Character, Integer>>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Date, HashMap<Character, Integer>> reduce(Tuple2<Date, HashMap<Character, Integer>> value1,
+				Tuple2<Date,
+						HashMap<Character, Integer>> value2) throws Exception {
+			HashMap<Character, Integer> map1 = value1.f1;
+			HashMap<Character, Integer> map2 = value2.f1;
+			for (Character key : map2.keySet()) {
+				Integer volume1 = map1.get(key);
+				Integer volume2 = map2.get(key);
+				if (volume1 == null) {
+					volume1 = 0;
+				}
+				map1.put(key, volume1 + volume2);
+			}
+			return new Tuple2<Date, HashMap<Character, Integer>>(value2.f0, map1);
+		}
+	}
+
+	private static class FlatMapFunction6 implements FlatMapFunction<Tuple2<Date, HashMap<Character, Integer>>, Tuple2<Integer,
+			Tuple2<Character, Integer>>> {
+
+		@Override
+		public void flatMap(Tuple2<Date, HashMap<Character, Integer>> value, Collector<Tuple2<Integer,
+				Tuple2<Character, Integer>>> out) throws Exception {
+			Calendar cal = Calendar.getInstance();
+			cal.setTime(value.f0);
+			for (Character key : value.f1.keySet()) {
+				out.collect(new Tuple2<Integer, Tuple2<Character, Integer>>(cal.get(Calendar.MONTH)
+						+ 1,
+						new Tuple2<Character, Integer>(key, value.f1.get(key))));
+			}
+		}
+	}
+
+	private static class MapFunction6 implements MapFunction<Tuple2<Date, HashMap<Character, Integer>>, ArrayList<Character>> {
+
+		@Override
+		public ArrayList<Character> map(Tuple2<Date, HashMap<Character, Integer>> value)
+				throws Exception {
+			ArrayList<Character> list = new ArrayList<Character>();
+			for (Character ch : value.f1.keySet()) {
+				for (int i = 0; i < value.f1.get(ch); i++) {
+					list.add(ch);
+				}
+			}
+			Collections.sort(list);
+			return list;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
new file mode 100644
index 0000000..24fe1eb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.Serializable;
+
+public class RectangleClass implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	public int a;
+	public int b;
+
+	public RectangleClass(int a, int b) {
+		this.a = a;
+		this.b = b;
+	}
+
+	public RectangleClass next() {
+		return new RectangleClass(a + (b % 11), b + (a % 9));
+	}
+
+	@Override
+	public String toString() {
+		return "(" + a + "," + b + ")";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
new file mode 100644
index 0000000..55fc5e9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class StreamingMultipleProgramsTestBase extends MultipleProgramsTestBase {
+	public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+
+	@Parameterized.Parameters(name = "Execution mode = {0}")
+	public static Collection<TestExecutionMode[]> executionModes() {
+		TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};
+		ArrayList<TestExecutionMode[]> temsList = new ArrayList<TestExecutionMode[]>();
+		temsList.add(tems);
+		return temsList;
+	}
+}


[2/2] flink git commit: [streaming] Added proper StreamingMultipleProgramsBase

Posted by mb...@apache.org.
[streaming] Added proper StreamingMultipleProgramsBase

Also minor fixes for streaming complex integration test
Closes #520


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48e21a1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48e21a1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48e21a1a

Branch: refs/heads/master
Commit: 48e21a1ae5ac1e762aa670f53ef1a976bacabf8c
Parents: 4786b56
Author: mbalassi <mb...@apache.org>
Authored: Tue May 12 16:21:35 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed May 13 12:08:38 2015 +0200

----------------------------------------------------------------------
 .../api/complex/ComplexIntegrationTest.java     | 124 ++++++++++++-------
 .../flink/streaming/util/RectangleClass.java    |  43 -------
 .../util/StreamingMultipleProgramsTestBase.java |  43 ++++++-
 .../scala/table/test/AggregationsITCase.scala   |   2 +-
 .../flink/api/scala/table/test/AsITCase.scala   |   2 +-
 .../api/scala/table/test/CastingITCase.scala    |   2 +-
 .../scala/table/test/ExpressionsITCase.scala    |   2 +-
 .../api/scala/table/test/FilterITCase.scala     |   2 +-
 .../table/test/GroupedAggreagationsITCase.scala |   2 +-
 .../flink/api/scala/table/test/JoinITCase.scala |   2 +-
 .../api/scala/table/test/SelectITCase.scala     |   2 +-
 .../table/test/StringExpressionsITCase.scala    |   2 +-
 .../util/AbstractMultipleProgramsTestBase.java  |  84 +++++++++++++
 .../test/util/MultipleProgramsTestBase.java     |  35 +-----
 .../api/scala/actions/CountCollectITCase.scala  |   2 +-
 .../scala/functions/ClosureCleanerITCase.scala  |   2 +-
 .../scala/io/ScalaCsvReaderWithPOJOITCase.scala |   2 +-
 .../api/scala/operators/AggregateITCase.scala   |   2 +-
 .../api/scala/operators/CoGroupITCase.scala     |   2 +-
 .../flink/api/scala/operators/CrossITCase.scala |   2 +-
 .../api/scala/operators/DistinctITCase.scala    |   2 +-
 .../api/scala/operators/ExamplesITCase.scala    |   2 +-
 .../api/scala/operators/FilterITCase.scala      |   2 +-
 .../api/scala/operators/FirstNITCase.scala      |   2 +-
 .../api/scala/operators/FlatMapITCase.scala     |   2 +-
 .../scala/operators/GroupCombineITCase.scala    |   2 +-
 .../api/scala/operators/GroupReduceITCase.scala |   2 +-
 .../flink/api/scala/operators/JoinITCase.scala  |   2 +-
 .../flink/api/scala/operators/MapITCase.scala   |   2 +-
 .../api/scala/operators/PartitionITCase.scala   |   2 +-
 .../api/scala/operators/ReduceITCase.scala      |   2 +-
 .../api/scala/operators/SumMinMaxITCase.scala   |   2 +-
 .../flink/api/scala/operators/UnionITCase.scala |   2 +-
 .../scala/runtime/ScalaSpecialTypesITCase.scala |   2 +-
 34 files changed, 236 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 74b5f1d..67c1387 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.RectangleClass;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
@@ -64,8 +63,11 @@ import java.util.HashMap;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase implements Serializable {
-	private static final long serialVersionUID = 1L;
+public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
+
+	// *************************************************************************
+	// GENERAL SETUP
+	// *************************************************************************
 
 	private String resultPath1;
 	private String resultPath2;
@@ -93,6 +95,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		compareResultsByLinesInMemory(expected2, resultPath2);
 	}
 
+	// *************************************************************************
+	// INTEGRATION TESTS
+	// *************************************************************************
+
 	@Test
 	public void complexIntegrationTest1() throws Exception {
 		//Testing data stream splitting with tuples
@@ -113,7 +119,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 
 		//We create a separate environment for this test because of the slot-related to iteration issues.
 		StreamExecutionEnvironment env = new TestStreamEnvironment(4, 32); //StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 		DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
 
 		IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.sum(0).setParallelism(1).filter(new FilterFunction
@@ -131,7 +136,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 
 		step.select("firstOutput")
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-		step.select("secondOutput")//.print();
+		step.select("secondOutput")
 				.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
 
 		env.execute();
@@ -173,8 +178,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 				new Tuple5<Integer, String, Character, Double, Boolean>(17, "peach", 'd', 1.0, true));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setBufferTimeout(100);
-		env.setParallelism(4);
 
 		SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
 		DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
@@ -215,7 +218,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		expected2 += "(" + 20000 + "," + 1 + ")";
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 
 		DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
 		DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
@@ -237,8 +239,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 				return new Tuple2<Long, Integer>(value, 1);
 			}
 		})
-//				.groupBy(0)
-//				.sum(1)
 				.groupBy(0)
 				.window(Count.of(10000)).sum(1).flatten()
 				.filter(new FilterFunction<Tuple2<Long, Integer>>() {
@@ -269,7 +269,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 				"((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 
 		env.addSource(new RectangleSource())
 				.global()
@@ -303,7 +302,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 				"12\n" + "15\n" + "16\n" + "20\n" + "25\n";
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 
 		DataStream<Long> dataStream51 = env.generateSequence(1, 5)
 				.map(new MapFunction<Long, Long>() {
@@ -345,7 +343,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		});
 
 
-		dataStream53.merge(dataStream52)//.print();
+		dataStream53.merge(dataStream52)
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		env.execute();
@@ -445,7 +443,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("01-10-2014"), sale10));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 
 		DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
 		sourceStream6.window(Time.of(1, new Timestamp6()))
@@ -461,6 +458,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 
 	}
 
+	// *************************************************************************
+	// FUNCTIONS
+	// *************************************************************************
+
 	private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String,
 			Double, Boolean>> {
 
@@ -473,36 +474,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 
 	}
 
-	public static class InnerPojo {
-		public Long f0;
-		public String f1;
-
-		public InnerPojo(Long f0, String f1) {
-			this.f0 = f0;
-			this.f1 = f1;
-		}
-
-		@Override
-		public String toString() {
-			return "POJO(" + f0 + "," + f1 + ")";
-		}
-	}
-
-	public static class OuterPojo {
-		public InnerPojo f0;
-		public Long f1;
-
-		public OuterPojo(InnerPojo f0, Long f1) {
-			this.f0 = f0;
-			this.f1 = f1;
-		}
-
-		@Override
-		public String toString() {
-			return "POJO(" + f0 + "," + f1 + ")";
-		}
-	}
-
 	private static class PojoSource implements SourceFunction<OuterPojo> {
 		private static final long serialVersionUID = 1L;
 
@@ -746,4 +717,69 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		}
 	}
 
+	// *************************************************************************
+	// DATA TYPES
+	// *************************************************************************
+
+	//Flink Pojo
+	public static class InnerPojo {
+		public Long f0;
+		public String f1;
+
+		//default constructor to qualify as Flink POJO
+		InnerPojo(){}
+
+		public InnerPojo(Long f0, String f1) {
+			this.f0 = f0;
+			this.f1 = f1;
+		}
+
+		@Override
+		public String toString() {
+			return "POJO(" + f0 + "," + f1 + ")";
+		}
+	}
+
+	// Nested class serialized with Kryo
+	public static class OuterPojo {
+		public InnerPojo f0;
+		public Long f1;
+
+		public OuterPojo(InnerPojo f0, Long f1) {
+			this.f0 = f0;
+			this.f1 = f1;
+		}
+
+		@Override
+		public String toString() {
+			return "POJO(" + f0 + "," + f1 + ")";
+		}
+	}
+
+	public static class RectangleClass implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public int a;
+		public int b;
+
+		//default constructor to qualify as Flink POJO
+		public RectangleClass() {}
+
+		public RectangleClass(int a, int b) {
+			this.a = a;
+			this.b = b;
+		}
+
+		public RectangleClass next() {
+			return new RectangleClass(a + (b % 11), b + (a % 9));
+		}
+
+		@Override
+		public String toString() {
+			return "(" + a + "," + b + ")";
+		}
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
deleted file mode 100644
index 24fe1eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.Serializable;
-
-public class RectangleClass implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	public int a;
-	public int b;
-
-	public RectangleClass(int a, int b) {
-		this.a = a;
-		this.b = b;
-	}
-
-	public RectangleClass next() {
-		return new RectangleClass(a + (b % 11), b + (a % 9));
-	}
-
-	@Override
-	public String toString() {
-		return "(" + a + "," + b + ")";
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 55fc5e9..36e62f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,18 +18,55 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode;
 import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Collection;
 
-public class StreamingMultipleProgramsTestBase extends MultipleProgramsTestBase {
+/**
+ * Base class for streaming unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the StreamExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ *   @Test
+ *   public void someTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   @Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * }</pre>
+ */
+public class StreamingMultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+
 	public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
 		super(mode);
+		switch(this.mode){
+			case CLUSTER:
+				TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+				clusterEnv.setAsContext();
+				break;
+			case COLLECTION:
+				throw new UnsupportedOperationException("Flink streaming currently has no collection execution backend.");
+		}
 	}
 
-
 	@Parameterized.Parameters(name = "Execution mode = {0}")
 	public static Collection<TestExecutionMode[]> executionModes() {
 		TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 9049f3c..38be85e 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 536bad4..3a0cc69 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 4c0e624..9557985 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 9a52f8f..51dc428 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index d1f5485..982a302 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 30498fa..1f29722 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 53d1da0..17221d8 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 2396cea..6ba6c9f 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index c5950bf..3f0f46f 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
new file mode 100644
index 0000000..ef81dfe
--- /dev/null
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Abstract base class for unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the ExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ *   @Test
+ *   public void someTest() {
+ *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   @Test
+ *   public void anotherTest() {
+ *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * }</pre>
+ */
+public abstract class AbstractMultipleProgramsTestBase extends TestBaseUtils {
+
+	/**
+	 * Enum that defines which execution environment to run the next test on:
+	 * An embedded local flink cluster, or the collection execution backend.
+	 */
+	public enum TestExecutionMode {
+		CLUSTER,
+		COLLECTION
+	}
+
+	// -----------------------------------------------------------------------------------------...
+
+	private static final int DEFAULT_PARALLELISM = 4;
+
+	protected static ForkableFlinkMiniCluster cluster = null;
+
+	protected transient TestExecutionMode mode;
+
+	public AbstractMultipleProgramsTestBase(TestExecutionMode mode){
+		this.mode = mode;
+	}
+
+	@BeforeClass
+	public static void setup() throws Exception{
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 3e4bb33..e0c4360 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.util;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
@@ -53,28 +51,11 @@ import java.util.Collection;
  *
  * }</pre>
  */
-public class MultipleProgramsTestBase extends TestBaseUtils {
-
-	/**
-	 * Enum that defines which execution environment to run the next test on:
-	 * An embedded local flink cluster, or the collection execution backend.
-	 */
-	public enum TestExecutionMode {
-		CLUSTER,
-		COLLECTION
-	}
-
-	// -----------------------------------------------------------------------------------------...
-
-	private static final int DEFAULT_PARALLELISM = 4;
-
-	protected static ForkableFlinkMiniCluster cluster = null;
-
-	protected transient TestExecutionMode mode;
+public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
 
 	public MultipleProgramsTestBase(TestExecutionMode mode){
-		this.mode = mode;
-		switch(mode){
+		super(mode);
+		switch(this.mode){
 			case CLUSTER:
 				TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
 				clusterEnv.setAsContext();
@@ -86,16 +67,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 		}
 	}
 
-	@BeforeClass
-	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-
 	@Parameterized.Parameters(name = "Execution mode = {0}")
 	public static Collection<TestExecutionMode[]> executionModes(){
 		return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
index 2d2cdd3..d19f543 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.api.scala.actions
 
 import org.apache.flink.api.scala._
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 
 import org.junit.Test
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index 2cae79c..e9b4ffe 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.functions
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.junit.Assert.fail
 import org.junit.{After, Before, Test, Rule}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
index 21aa93d..7e395d9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit.Assert._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 484226d..432a6d4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 3379fe2..eaf4117 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
index a8611c8..512ec6c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index 31c6052..8c10271 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
index 2bccc5b..7cf802c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
index 8336ae3..082201b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichFilterFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
index 9c59206..36cc3a7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
index 4a66b80..61751da 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.junit.{Test, After, Before, Rule}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 380b3bc..1b40205 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.test.javaApiOperators.GroupCombineITCase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.util.Collector
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index fe9e3f3..b832647 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -28,7 +28,7 @@ CustomType}
 import org.apache.flink.optimizer.Optimizer
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.hamcrest.core.{IsNot, IsEqual}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
index 4135ab2..fc4a9ce 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichJoinFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 5ade21f..64fdc1f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 98bb446..08d82d2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunctio
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index 9e147b8..a9f420f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index d94f099..10bb7c5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 
 import org.junit.Test

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index 2cf3ab3..b0b3764 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 63ec2a4..48a33f7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.runtime
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder