You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/13 15:44:06 UTC
[2/2] flink git commit: [streaming] Added proper
StreamingMultipleProgramsBase
[streaming] Added proper StreamingMultipleProgramsBase
Also minor fixes for streaming complex integration test
Closes #520
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48e21a1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48e21a1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48e21a1a
Branch: refs/heads/master
Commit: 48e21a1ae5ac1e762aa670f53ef1a976bacabf8c
Parents: 4786b56
Author: mbalassi <mb...@apache.org>
Authored: Tue May 12 16:21:35 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed May 13 12:08:38 2015 +0200
----------------------------------------------------------------------
.../api/complex/ComplexIntegrationTest.java | 124 ++++++++++++-------
.../flink/streaming/util/RectangleClass.java | 43 -------
.../util/StreamingMultipleProgramsTestBase.java | 43 ++++++-
.../scala/table/test/AggregationsITCase.scala | 2 +-
.../flink/api/scala/table/test/AsITCase.scala | 2 +-
.../api/scala/table/test/CastingITCase.scala | 2 +-
.../scala/table/test/ExpressionsITCase.scala | 2 +-
.../api/scala/table/test/FilterITCase.scala | 2 +-
.../table/test/GroupedAggreagationsITCase.scala | 2 +-
.../flink/api/scala/table/test/JoinITCase.scala | 2 +-
.../api/scala/table/test/SelectITCase.scala | 2 +-
.../table/test/StringExpressionsITCase.scala | 2 +-
.../util/AbstractMultipleProgramsTestBase.java | 84 +++++++++++++
.../test/util/MultipleProgramsTestBase.java | 35 +-----
.../api/scala/actions/CountCollectITCase.scala | 2 +-
.../scala/functions/ClosureCleanerITCase.scala | 2 +-
.../scala/io/ScalaCsvReaderWithPOJOITCase.scala | 2 +-
.../api/scala/operators/AggregateITCase.scala | 2 +-
.../api/scala/operators/CoGroupITCase.scala | 2 +-
.../flink/api/scala/operators/CrossITCase.scala | 2 +-
.../api/scala/operators/DistinctITCase.scala | 2 +-
.../api/scala/operators/ExamplesITCase.scala | 2 +-
.../api/scala/operators/FilterITCase.scala | 2 +-
.../api/scala/operators/FirstNITCase.scala | 2 +-
.../api/scala/operators/FlatMapITCase.scala | 2 +-
.../scala/operators/GroupCombineITCase.scala | 2 +-
.../api/scala/operators/GroupReduceITCase.scala | 2 +-
.../flink/api/scala/operators/JoinITCase.scala | 2 +-
.../flink/api/scala/operators/MapITCase.scala | 2 +-
.../api/scala/operators/PartitionITCase.scala | 2 +-
.../api/scala/operators/ReduceITCase.scala | 2 +-
.../api/scala/operators/SumMinMaxITCase.scala | 2 +-
.../flink/api/scala/operators/UnionITCase.scala | 2 +-
.../scala/runtime/ScalaSpecialTypesITCase.scala | 2 +-
34 files changed, 236 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 74b5f1d..67c1387 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.RectangleClass;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
@@ -64,8 +63,11 @@ import java.util.HashMap;
import java.util.List;
@RunWith(Parameterized.class)
-public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase implements Serializable {
- private static final long serialVersionUID = 1L;
+public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
+
+ // *************************************************************************
+ // GENERAL SETUP
+ // *************************************************************************
private String resultPath1;
private String resultPath2;
@@ -93,6 +95,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
compareResultsByLinesInMemory(expected2, resultPath2);
}
+ // *************************************************************************
+ // INTEGRATION TESTS
+ // *************************************************************************
+
@Test
public void complexIntegrationTest1() throws Exception {
//Testing data stream splitting with tuples
@@ -113,7 +119,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
//We create a separate environment for this test because of the slot-related to iteration issues.
StreamExecutionEnvironment env = new TestStreamEnvironment(4, 32); //StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.sum(0).setParallelism(1).filter(new FilterFunction
@@ -131,7 +136,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
step.select("firstOutput")
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
- step.select("secondOutput")//.print();
+ step.select("secondOutput")
.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
env.execute();
@@ -173,8 +178,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
new Tuple5<Integer, String, Character, Double, Boolean>(17, "peach", 'd', 1.0, true));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setBufferTimeout(100);
- env.setParallelism(4);
SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
@@ -215,7 +218,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
expected2 += "(" + 20000 + "," + 1 + ")";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
@@ -237,8 +239,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
return new Tuple2<Long, Integer>(value, 1);
}
})
-// .groupBy(0)
-// .sum(1)
.groupBy(0)
.window(Count.of(10000)).sum(1).flatten()
.filter(new FilterFunction<Tuple2<Long, Integer>>() {
@@ -269,7 +269,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
"((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
env.addSource(new RectangleSource())
.global()
@@ -303,7 +302,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
"12\n" + "15\n" + "16\n" + "20\n" + "25\n";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Long> dataStream51 = env.generateSequence(1, 5)
.map(new MapFunction<Long, Long>() {
@@ -345,7 +343,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
});
- dataStream53.merge(dataStream52)//.print();
+ dataStream53.merge(dataStream52)
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
env.execute();
@@ -445,7 +443,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("01-10-2014"), sale10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
sourceStream6.window(Time.of(1, new Timestamp6()))
@@ -461,6 +458,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
}
+ // *************************************************************************
+ // FUNCTIONS
+ // *************************************************************************
+
private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String,
Double, Boolean>> {
@@ -473,36 +474,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
}
- public static class InnerPojo {
- public Long f0;
- public String f1;
-
- public InnerPojo(Long f0, String f1) {
- this.f0 = f0;
- this.f1 = f1;
- }
-
- @Override
- public String toString() {
- return "POJO(" + f0 + "," + f1 + ")";
- }
- }
-
- public static class OuterPojo {
- public InnerPojo f0;
- public Long f1;
-
- public OuterPojo(InnerPojo f0, Long f1) {
- this.f0 = f0;
- this.f1 = f1;
- }
-
- @Override
- public String toString() {
- return "POJO(" + f0 + "," + f1 + ")";
- }
- }
-
private static class PojoSource implements SourceFunction<OuterPojo> {
private static final long serialVersionUID = 1L;
@@ -746,4 +717,69 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
}
}
+ // *************************************************************************
+ // DATA TYPES
+ // *************************************************************************
+
+ //Flink Pojo
+ public static class InnerPojo {
+ public Long f0;
+ public String f1;
+
+ //default constructor to qualify as Flink POJO
+ InnerPojo(){}
+
+ public InnerPojo(Long f0, String f1) {
+ this.f0 = f0;
+ this.f1 = f1;
+ }
+
+ @Override
+ public String toString() {
+ return "POJO(" + f0 + "," + f1 + ")";
+ }
+ }
+
+ // Nested class serialized with Kryo
+ public static class OuterPojo {
+ public InnerPojo f0;
+ public Long f1;
+
+ public OuterPojo(InnerPojo f0, Long f1) {
+ this.f0 = f0;
+ this.f1 = f1;
+ }
+
+ @Override
+ public String toString() {
+ return "POJO(" + f0 + "," + f1 + ")";
+ }
+ }
+
+ public static class RectangleClass implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public int a;
+ public int b;
+
+ //default constructor to qualify as Flink POJO
+ public RectangleClass() {}
+
+ public RectangleClass(int a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ public RectangleClass next() {
+ return new RectangleClass(a + (b % 11), b + (a % 9));
+ }
+
+ @Override
+ public String toString() {
+ return "(" + a + "," + b + ")";
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
deleted file mode 100644
index 24fe1eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.Serializable;
-
-public class RectangleClass implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- public int a;
- public int b;
-
- public RectangleClass(int a, int b) {
- this.a = a;
- this.b = b;
- }
-
- public RectangleClass next() {
- return new RectangleClass(a + (b % 11), b + (a % 9));
- }
-
- @Override
- public String toString() {
- return "(" + a + "," + b + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 55fc5e9..36e62f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,18 +18,55 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collection;
-public class StreamingMultipleProgramsTestBase extends MultipleProgramsTestBase {
+/**
+ * Base class for streaming unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the StreamExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ * @Test
+ * public void someTest() {
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * // test code
+ * env.execute();
+ * }
+ *
+ * @Test
+ * public void anotherTest() {
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * // test code
+ * env.execute();
+ * }
+ *
+ * }</pre>
+ */
+public class StreamingMultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+
public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
super(mode);
+ switch(this.mode){
+ case CLUSTER:
+ TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+ clusterEnv.setAsContext();
+ break;
+ case COLLECTION:
+ throw new UnsupportedOperationException("Flink streaming currently has no collection execution backend.");
+ }
}
-
@Parameterized.Parameters(name = "Execution mode = {0}")
public static Collection<TestExecutionMode[]> executionModes() {
TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 9049f3c..38be85e 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 536bad4..3a0cc69 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 4c0e624..9557985 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 9a52f8f..51dc428 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index d1f5485..982a302 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 30498fa..1f29722 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 53d1da0..17221d8 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 2396cea..6ba6c9f 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index c5950bf..3f0f46f 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
new file mode 100644
index 0000000..ef81dfe
--- /dev/null
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Abstract base class for unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the ExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ * @Test
+ * public void someTest() {
+ * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ * // test code
+ * env.execute();
+ * }
+ *
+ * @Test
+ * public void anotherTest() {
+ * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ * // test code
+ * env.execute();
+ * }
+ *
+ * }</pre>
+ */
+public abstract class AbstractMultipleProgramsTestBase extends TestBaseUtils {
+
+ /**
+ * Enum that defines which execution environment to run the next test on:
+ * An embedded local flink cluster, or the collection execution backend.
+ */
+ public enum TestExecutionMode {
+ CLUSTER,
+ COLLECTION
+ }
+
+ // -----------------------------------------------------------------------------------------...
+
+ private static final int DEFAULT_PARALLELISM = 4;
+
+ protected static ForkableFlinkMiniCluster cluster = null;
+
+ protected transient TestExecutionMode mode;
+
+ public AbstractMultipleProgramsTestBase(TestExecutionMode mode){
+ this.mode = mode;
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception{
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 3e4bb33..e0c4360 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,8 +18,6 @@
package org.apache.flink.test.util;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
import java.util.Arrays;
@@ -53,28 +51,11 @@ import java.util.Collection;
*
* }</pre>
*/
-public class MultipleProgramsTestBase extends TestBaseUtils {
-
- /**
- * Enum that defines which execution environment to run the next test on:
- * An embedded local flink cluster, or the collection execution backend.
- */
- public enum TestExecutionMode {
- CLUSTER,
- COLLECTION
- }
-
- // -----------------------------------------------------------------------------------------...
-
- private static final int DEFAULT_PARALLELISM = 4;
-
- protected static ForkableFlinkMiniCluster cluster = null;
-
- protected transient TestExecutionMode mode;
+public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
public MultipleProgramsTestBase(TestExecutionMode mode){
- this.mode = mode;
- switch(mode){
+ super(mode);
+ switch(this.mode){
case CLUSTER:
TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
clusterEnv.setAsContext();
@@ -86,16 +67,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
}
}
- @BeforeClass
- public static void setup() throws Exception{
- cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
- }
-
- @AfterClass
- public static void teardown() throws Exception {
- stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
- }
-
@Parameterized.Parameters(name = "Execution mode = {0}")
public static Collection<TestExecutionMode[]> executionModes(){
return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
index 2d2cdd3..d19f543 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
@@ -19,8 +19,8 @@
package org.apache.flink.api.scala.actions
import org.apache.flink.api.scala._
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit.Test
import org.junit.Assert._
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index 2cae79c..e9b4ffe 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.functions
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit.Assert.fail
import org.junit.{After, Before, Test, Rule}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
index 21aa93d..7e395d9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit.Assert._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 484226d..432a6d4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 3379fe2..eaf4117 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
index a8611c8..512ec6c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index 31c6052..8c10271 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
index 2bccc5b..7cf802c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
index 8336ae3..082201b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
index 9c59206..36cc3a7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
index 4a66b80..61751da 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.junit.{Test, After, Before, Rule}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 380b3bc..1b40205 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.test.javaApiOperators.GroupCombineITCase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.apache.flink.util.Collector
import org.junit._
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index fe9e3f3..b832647 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -28,7 +28,7 @@ CustomType}
import org.apache.flink.optimizer.Optimizer
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.hamcrest.core.{IsNot, IsEqual}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
index 4135ab2..fc4a9ce 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichJoinFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 5ade21f..64fdc1f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 98bb446..08d82d2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunctio
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index 9e147b8..a9f420f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index d94f099..10bb7c5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit.Test
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index 2cf3ab3..b0b3764 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 63ec2a4..48a33f7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.runtime
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder