You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/13 15:44:06 UTC

[2/2] flink git commit: [streaming] Added proper StreamingMultipleProgramsBase

[streaming] Added proper StreamingMultipleProgramsBase

Also minor fixes for streaming complex integration test
Closes #520


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48e21a1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48e21a1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48e21a1a

Branch: refs/heads/master
Commit: 48e21a1ae5ac1e762aa670f53ef1a976bacabf8c
Parents: 4786b56
Author: mbalassi <mb...@apache.org>
Authored: Tue May 12 16:21:35 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed May 13 12:08:38 2015 +0200

----------------------------------------------------------------------
 .../api/complex/ComplexIntegrationTest.java     | 124 ++++++++++++-------
 .../flink/streaming/util/RectangleClass.java    |  43 -------
 .../util/StreamingMultipleProgramsTestBase.java |  43 ++++++-
 .../scala/table/test/AggregationsITCase.scala   |   2 +-
 .../flink/api/scala/table/test/AsITCase.scala   |   2 +-
 .../api/scala/table/test/CastingITCase.scala    |   2 +-
 .../scala/table/test/ExpressionsITCase.scala    |   2 +-
 .../api/scala/table/test/FilterITCase.scala     |   2 +-
 .../table/test/GroupedAggreagationsITCase.scala |   2 +-
 .../flink/api/scala/table/test/JoinITCase.scala |   2 +-
 .../api/scala/table/test/SelectITCase.scala     |   2 +-
 .../table/test/StringExpressionsITCase.scala    |   2 +-
 .../util/AbstractMultipleProgramsTestBase.java  |  84 +++++++++++++
 .../test/util/MultipleProgramsTestBase.java     |  35 +-----
 .../api/scala/actions/CountCollectITCase.scala  |   2 +-
 .../scala/functions/ClosureCleanerITCase.scala  |   2 +-
 .../scala/io/ScalaCsvReaderWithPOJOITCase.scala |   2 +-
 .../api/scala/operators/AggregateITCase.scala   |   2 +-
 .../api/scala/operators/CoGroupITCase.scala     |   2 +-
 .../flink/api/scala/operators/CrossITCase.scala |   2 +-
 .../api/scala/operators/DistinctITCase.scala    |   2 +-
 .../api/scala/operators/ExamplesITCase.scala    |   2 +-
 .../api/scala/operators/FilterITCase.scala      |   2 +-
 .../api/scala/operators/FirstNITCase.scala      |   2 +-
 .../api/scala/operators/FlatMapITCase.scala     |   2 +-
 .../scala/operators/GroupCombineITCase.scala    |   2 +-
 .../api/scala/operators/GroupReduceITCase.scala |   2 +-
 .../flink/api/scala/operators/JoinITCase.scala  |   2 +-
 .../flink/api/scala/operators/MapITCase.scala   |   2 +-
 .../api/scala/operators/PartitionITCase.scala   |   2 +-
 .../api/scala/operators/ReduceITCase.scala      |   2 +-
 .../api/scala/operators/SumMinMaxITCase.scala   |   2 +-
 .../flink/api/scala/operators/UnionITCase.scala |   2 +-
 .../scala/runtime/ScalaSpecialTypesITCase.scala |   2 +-
 34 files changed, 236 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 74b5f1d..67c1387 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.RectangleClass;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
@@ -64,8 +63,11 @@ import java.util.HashMap;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase implements Serializable {
-	private static final long serialVersionUID = 1L;
+public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
+
+	// *************************************************************************
+	// GENERAL SETUP
+	// *************************************************************************
 
 	private String resultPath1;
 	private String resultPath2;
@@ -93,6 +95,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		compareResultsByLinesInMemory(expected2, resultPath2);
 	}
 
+	// *************************************************************************
+	// INTEGRATION TESTS
+	// *************************************************************************
+
 	@Test
 	public void complexIntegrationTest1() throws Exception {
 		//Testing data stream splitting with tuples
@@ -113,7 +119,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 
 		//We create a separate environment for this test because of the slot-related to iteration issues.
 		StreamExecutionEnvironment env = new TestStreamEnvironment(4, 32); //StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 		DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
 
 		IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.sum(0).setParallelism(1).filter(new FilterFunction
@@ -131,7 +136,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 
 		step.select("firstOutput")
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-		step.select("secondOutput")//.print();
+		step.select("secondOutput")
 				.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
 
 		env.execute();
@@ -173,8 +178,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 				new Tuple5<Integer, String, Character, Double, Boolean>(17, "peach", 'd', 1.0, true));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setBufferTimeout(100);
-		env.setParallelism(4);
 
 		SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
 		DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
@@ -215,7 +218,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		expected2 += "(" + 20000 + "," + 1 + ")";
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 
 		DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
 		DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
@@ -237,8 +239,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 				return new Tuple2<Long, Integer>(value, 1);
 			}
 		})
-//				.groupBy(0)
-//				.sum(1)
 				.groupBy(0)
 				.window(Count.of(10000)).sum(1).flatten()
 				.filter(new FilterFunction<Tuple2<Long, Integer>>() {
@@ -269,7 +269,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 				"((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 
 		env.addSource(new RectangleSource())
 				.global()
@@ -303,7 +302,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 				"12\n" + "15\n" + "16\n" + "20\n" + "25\n";
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 
 		DataStream<Long> dataStream51 = env.generateSequence(1, 5)
 				.map(new MapFunction<Long, Long>() {
@@ -345,7 +343,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		});
 
 
-		dataStream53.merge(dataStream52)//.print();
+		dataStream53.merge(dataStream52)
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		env.execute();
@@ -445,7 +443,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("01-10-2014"), sale10));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(4);
 
 		DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
 		sourceStream6.window(Time.of(1, new Timestamp6()))
@@ -461,6 +458,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 
 	}
 
+	// *************************************************************************
+	// FUNCTIONS
+	// *************************************************************************
+
 	private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String,
 			Double, Boolean>> {
 
@@ -473,36 +474,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 
 	}
 
-	public static class InnerPojo {
-		public Long f0;
-		public String f1;
-
-		public InnerPojo(Long f0, String f1) {
-			this.f0 = f0;
-			this.f1 = f1;
-		}
-
-		@Override
-		public String toString() {
-			return "POJO(" + f0 + "," + f1 + ")";
-		}
-	}
-
-	public static class OuterPojo {
-		public InnerPojo f0;
-		public Long f1;
-
-		public OuterPojo(InnerPojo f0, Long f1) {
-			this.f0 = f0;
-			this.f1 = f1;
-		}
-
-		@Override
-		public String toString() {
-			return "POJO(" + f0 + "," + f1 + ")";
-		}
-	}
-
 	private static class PojoSource implements SourceFunction<OuterPojo> {
 		private static final long serialVersionUID = 1L;
 
@@ -746,4 +717,69 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
 		}
 	}
 
+	// *************************************************************************
+	// DATA TYPES
+	// *************************************************************************
+
+	//Flink Pojo
+	public static class InnerPojo {
+		public Long f0;
+		public String f1;
+
+		//default constructor to qualify as Flink POJO
+		InnerPojo(){}
+
+		public InnerPojo(Long f0, String f1) {
+			this.f0 = f0;
+			this.f1 = f1;
+		}
+
+		@Override
+		public String toString() {
+			return "POJO(" + f0 + "," + f1 + ")";
+		}
+	}
+
+	// Nested class serialized with Kryo
+	public static class OuterPojo {
+		public InnerPojo f0;
+		public Long f1;
+
+		public OuterPojo(InnerPojo f0, Long f1) {
+			this.f0 = f0;
+			this.f1 = f1;
+		}
+
+		@Override
+		public String toString() {
+			return "POJO(" + f0 + "," + f1 + ")";
+		}
+	}
+
+	public static class RectangleClass implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public int a;
+		public int b;
+
+		//default constructor to qualify as Flink POJO
+		public RectangleClass() {}
+
+		public RectangleClass(int a, int b) {
+			this.a = a;
+			this.b = b;
+		}
+
+		public RectangleClass next() {
+			return new RectangleClass(a + (b % 11), b + (a % 9));
+		}
+
+		@Override
+		public String toString() {
+			return "(" + a + "," + b + ")";
+		}
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
deleted file mode 100644
index 24fe1eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.Serializable;
-
-public class RectangleClass implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	public int a;
-	public int b;
-
-	public RectangleClass(int a, int b) {
-		this.a = a;
-		this.b = b;
-	}
-
-	public RectangleClass next() {
-		return new RectangleClass(a + (b % 11), b + (a % 9));
-	}
-
-	@Override
-	public String toString() {
-		return "(" + a + "," + b + ")";
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 55fc5e9..36e62f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,18 +18,55 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode;
 import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Collection;
 
-public class StreamingMultipleProgramsTestBase extends MultipleProgramsTestBase {
+/**
+ * Base class for streaming unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the StreamExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ *   @Test
+ *   public void someTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   @Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * }</pre>
+ */
+public class StreamingMultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+
 	public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
 		super(mode);
+		switch(this.mode){
+			case CLUSTER:
+				TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+				clusterEnv.setAsContext();
+				break;
+			case COLLECTION:
+				throw new UnsupportedOperationException("Flink streaming currently has no collection execution backend.");
+		}
 	}
 
-
 	@Parameterized.Parameters(name = "Execution mode = {0}")
 	public static Collection<TestExecutionMode[]> executionModes() {
 		TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 9049f3c..38be85e 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 536bad4..3a0cc69 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 4c0e624..9557985 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 9a52f8f..51dc428 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index d1f5485..982a302 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 30498fa..1f29722 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 53d1da0..17221d8 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 2396cea..6ba6c9f 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index c5950bf..3f0f46f 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
new file mode 100644
index 0000000..ef81dfe
--- /dev/null
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Abstract base class for unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the ExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ *   @Test
+ *   public void someTest() {
+ *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   @Test
+ *   public void anotherTest() {
+ *       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * }</pre>
+ */
+public abstract class AbstractMultipleProgramsTestBase extends TestBaseUtils {
+
+	/**
+	 * Enum that defines which execution environment to run the next test on:
+	 * An embedded local flink cluster, or the collection execution backend.
+	 */
+	public enum TestExecutionMode {
+		CLUSTER,
+		COLLECTION
+	}
+
+	// -----------------------------------------------------------------------------------------...
+
+	private static final int DEFAULT_PARALLELISM = 4;
+
+	protected static ForkableFlinkMiniCluster cluster = null;
+
+	protected transient TestExecutionMode mode;
+
+	public AbstractMultipleProgramsTestBase(TestExecutionMode mode){
+		this.mode = mode;
+	}
+
+	@BeforeClass
+	public static void setup() throws Exception{
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 3e4bb33..e0c4360 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.util;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
@@ -53,28 +51,11 @@ import java.util.Collection;
  *
  * }</pre>
  */
-public class MultipleProgramsTestBase extends TestBaseUtils {
-
-	/**
-	 * Enum that defines which execution environment to run the next test on:
-	 * An embedded local flink cluster, or the collection execution backend.
-	 */
-	public enum TestExecutionMode {
-		CLUSTER,
-		COLLECTION
-	}
-
-	// -----------------------------------------------------------------------------------------...
-
-	private static final int DEFAULT_PARALLELISM = 4;
-
-	protected static ForkableFlinkMiniCluster cluster = null;
-
-	protected transient TestExecutionMode mode;
+public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
 
 	public MultipleProgramsTestBase(TestExecutionMode mode){
-		this.mode = mode;
-		switch(mode){
+		super(mode);
+		switch(this.mode){
 			case CLUSTER:
 				TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
 				clusterEnv.setAsContext();
@@ -86,16 +67,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 		}
 	}
 
-	@BeforeClass
-	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-
 	@Parameterized.Parameters(name = "Execution mode = {0}")
 	public static Collection<TestExecutionMode[]> executionModes(){
 		return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
index 2d2cdd3..d19f543 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.api.scala.actions
 
 import org.apache.flink.api.scala._
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 
 import org.junit.Test
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index 2cae79c..e9b4ffe 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.functions
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.junit.Assert.fail
 import org.junit.{After, Before, Test, Rule}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
index 21aa93d..7e395d9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
 import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.junit.Assert._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 484226d..432a6d4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 3379fe2..eaf4117 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
index a8611c8..512ec6c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index 31c6052..8c10271 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
index 2bccc5b..7cf802c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
index 8336ae3..082201b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichFilterFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
index 9c59206..36cc3a7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
index 4a66b80..61751da 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.junit.{Test, After, Before, Rule}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 380b3bc..1b40205 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.test.javaApiOperators.GroupCombineITCase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.util.Collector
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index fe9e3f3..b832647 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -28,7 +28,7 @@ CustomType}
 import org.apache.flink.optimizer.Optimizer
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.apache.flink.util.Collector
 import org.hamcrest.core.{IsNot, IsEqual}

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
index 4135ab2..fc4a9ce 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichJoinFunction
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 5ade21f..64fdc1f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 98bb446..08d82d2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunctio
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index 9e147b8..a9f420f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit.{Test, After, Before, Rule}
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index d94f099..10bb7c5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 
 import org.junit.Test

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index 2cf3ab3..b0b3764 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder

http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 63ec2a4..48a33f7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.runtime
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{MultipleProgramsTestBase}
 import org.junit._
 import org.junit.rules.TemporaryFolder