You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/13 15:44:05 UTC
[1/2] flink git commit: [FLINK-1595] [streaming] Added complex
streaming integration test
Repository: flink
Updated Branches:
refs/heads/master 73493335f -> 48e21a1ae
[FLINK-1595] [streaming] Added complex streaming integration test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4786b56e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4786b56e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4786b56e
Branch: refs/heads/master
Commit: 4786b56e588c9812c635e1b4d9c28af68041f754
Parents: 7349333
Author: szape <ne...@gmail.com>
Authored: Tue Apr 21 11:01:08 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed May 13 09:51:43 2015 +0200
----------------------------------------------------------------------
.../SlidingTimeGroupedPreReducer.java | 8 +-
.../windowbuffer/SlidingTimePreReducer.java | 8 +-
.../api/complex/ComplexIntegrationTest.java | 749 +++++++++++++++++++
.../flink/streaming/util/RectangleClass.java | 43 ++
.../util/StreamingMultipleProgramsTestBase.java | 40 +
5 files changed, 842 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
index 3724ce5..cdb4207 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
@@ -69,9 +69,11 @@ public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T>
@Override
protected void afterEmit() {
- long lastTime = timestampWrapper.getTimestamp(lastStored);
- if (lastTime - windowStartTime >= slideSize) {
- windowStartTime = windowStartTime + slideSize;
+ if (lastStored != null) {
+ long lastTime = timestampWrapper.getTimestamp(lastStored);
+ if (lastTime - windowStartTime >= slideSize) {
+ windowStartTime = windowStartTime + slideSize;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
index 2382b48..7652d81 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -68,9 +68,11 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
@Override
protected void afterEmit() {
- long lastTime = timestampWrapper.getTimestamp(lastStored);
- if (lastTime - windowStartTime >= slideSize) {
- windowStartTime = windowStartTime + slideSize;
+ if (lastStored != null) {
+ long lastTime = timestampWrapper.getTimestamp(lastStored);
+ if (lastTime - windowStartTime >= slideSize) {
+ windowStartTime = windowStartTime + slideSize;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
new file mode 100644
index 0000000..74b5f1d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -0,0 +1,749 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.complex;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.RectangleClass;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String resultPath1;
+ private String resultPath2;
+ private String expected1;
+ private String expected2;
+
+ public ComplexIntegrationTest(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception {
+ resultPath1 = tempFolder.newFile().toURI().toString();
+ resultPath2 = tempFolder.newFile().toURI().toString();
+ expected1 = "";
+ expected2 = "";
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected1, resultPath1);
+ compareResultsByLinesInMemory(expected2, resultPath2);
+ }
+
+ @Test
+ public void complexIntegrationTest1() throws Exception {
+ //Testing data stream splitting with tuples
+
+ expected1 = "";
+ for (int i = 0; i < 8; i++) {
+ expected1 += "(10,(a,1))\n";
+ }
+ //i == 8
+ expected1 += "(10,(a,1))";
+
+ expected2 = "";
+ for (int i = 0; i < 18; i++) {
+ expected2 += "(20,(a,1))\n";
+ }
+ //i == 18
+ expected2 += "(20,(a,1))";
+
+ //We create a separate environment for this test because of the slot-related to iteration issues.
+ StreamExecutionEnvironment env = new TestStreamEnvironment(4, 32); //StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
+
+ IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.sum(0).setParallelism(1).filter(new FilterFunction
+ <Tuple2<Long, Tuple2<String, Long>>>() {
+
+ @Override
+ public boolean filter(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
+ return value.f0 < 20;
+ }
+ }).iterate(5000);
+
+ SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new
+ MyOutputSelector());
+ it.closeWith(step.select("iterate"));
+
+ step.select("firstOutput")
+ .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+ step.select("secondOutput")//.print();
+ .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void complexIntegrationTest2() throws Exception {
+ //Testing POJO source, grouping by multiple filds and windowing with timestamp
+
+ expected1 = "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
+ "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
+ "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
+ "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
+ "water_melon-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" +
+ "orange-b\n" + "orange-c\n" + "orange-c\n" + "orange-c\n" + "orange-c\n" + "orange-d\n" + "orange-d\n" +
+ "peach-d\n" + "peach-d\n";
+
+ List<Tuple5<Integer, String, Character, Double, Boolean>> input = Arrays.asList(
+ new Tuple5<Integer, String, Character, Double, Boolean>(1, "apple", 'j', 0.1, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(1, "peach", 'b', 0.8, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(1, "orange", 'c', 0.7, true),
+ new Tuple5<Integer, String, Character, Double, Boolean>(2, "apple", 'd', 0.5, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(2, "peach", 'j', 0.6, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(3, "orange", 'b', 0.2, true),
+ new Tuple5<Integer, String, Character, Double, Boolean>(6, "apple", 'c', 0.1, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(7, "peach", 'd', 0.4, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(8, "orange", 'j', 0.2, true),
+ new Tuple5<Integer, String, Character, Double, Boolean>(10, "apple", 'b', 0.1, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(10, "peach", 'c', 0.5, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(11, "orange", 'd', 0.3, true),
+ new Tuple5<Integer, String, Character, Double, Boolean>(11, "apple", 'j', 0.3, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(12, "peach", 'b', 0.9, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(13, "orange", 'c', 0.7, true),
+ new Tuple5<Integer, String, Character, Double, Boolean>(15, "apple", 'd', 0.2, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(16, "peach", 'j', 0.8, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(16, "orange", 'b', 0.8, true),
+ new Tuple5<Integer, String, Character, Double, Boolean>(16, "apple", 'c', 0.1, false),
+ new Tuple5<Integer, String, Character, Double, Boolean>(17, "peach", 'd', 1.0, true));
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setBufferTimeout(100);
+ env.setParallelism(4);
+
+ SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
+ DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
+
+ sourceStream21
+ .sum(3)
+ .groupBy(2, 2)
+ .window(Time.of(10, new MyTimestamp(), 0))
+ .every(Time.of(4, new MyTimestamp(), 0))
+ .maxBy(3)
+ .flatten()
+ .map(new MyMapFunction2())
+ .flatMap(new MyFlatMapFunction())
+ .connect(sourceStream22)
+ .map(new MyCoMapFunction())
+ .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void complexIntegrationTest3() throws Exception {
+ //Heavy prime factorisation with maps and flatmaps
+
+ expected1 = "541\n" + "1223\n" + "3319\n" + "5851\n" + "1987\n" + "8387\n" + "15907\n" + "10939\n" +
+ "4127\n" + "2477\n" + "6737\n" + "13421\n" + "4987\n" + "4999\n" + "18451\n" + "9283\n" + "7499\n" +
+ "16937\n" + "11927\n" + "9973\n" + "14431\n" + "19507\n" + "12497\n" + "17497\n" + "14983\n" +
+ "19997\n";
+
+ for (int i = 2; i < 100; i++) {
+ expected2 += "(" + i + "," + 20000 / i + ")\n";
+ }
+ for (int i = 19901; i < 20000; i++) {
+ expected2 += "(" + i + "," + 20000 / i + ")\n";
+ }
+ //i == 20000
+ expected2 += "(" + 20000 + "," + 1 + ")";
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
+ DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
+
+ sourceStream31.filter(new PrimeFilterFunction())
+ .window(Count.of(100))
+ .max(0).flatten()
+ .merge(sourceStream32.filter(new PrimeFilterFunction())
+ .window(Count.of(100))
+ .max(0).flatten())
+ .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+ sourceStream31.flatMap(new DivisorsFlatMapFunction())
+ .merge(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
+ Integer>>() {
+
+ @Override
+ public Tuple2<Long, Integer> map(Long value) throws Exception {
+ return new Tuple2<Long, Integer>(value, 1);
+ }
+ })
+// .groupBy(0)
+// .sum(1)
+ .groupBy(0)
+ .window(Count.of(10000)).sum(1).flatten()
+ .filter(new FilterFunction<Tuple2<Long, Integer>>() {
+
+ @Override
+ public boolean filter(Tuple2<Long, Integer> value) throws Exception {
+ return value.f0 < 100 || value.f0 > 19900;
+ }
+ })
+ .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+ }
+
+ @Test
+ public void complexIntegrationTest4() throws Exception {
+ //Testing mapping and delta-policy windowing with custom class
+
+ expected1 = "((100,100),0)\n" + "((120,122),5)\n" + "((121,125),6)\n" + "((138,144),9)\n" +
+ "((139,147),10)\n" + "((156,166),13)\n" + "((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" +
+ "((192,210),21)\n" + "((193,213),22)\n" + "((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" +
+ "((229,257),30)\n" + "((246,276),33)\n" + "((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" +
+ "((282,320),41)\n" + "((283,323),42)\n" + "((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" +
+ "((319,367),50)\n" + "((336,386),53)\n" + "((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" +
+ "((372,430),61)\n" + "((373,433),62)\n" + "((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" +
+ "((409,477),70)\n" + "((426,496),73)\n" + "((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" +
+ "((462,540),81)\n" + "((463,543),82)\n" + "((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" +
+ "((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ env.addSource(new RectangleSource())
+ .global()
+ .map(new RectangleMapFunction())
+ .window(Delta.of(0.0, new MyDelta(), new Tuple2<RectangleClass, Integer>(new RectangleClass(100, 100), 0)))
+ .mapWindow(new MyWindowMapFunction())
+ .flatten()
+ .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+ }
+
+ private static class MyDelta implements DeltaFunction<Tuple2<RectangleClass, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public double getDelta(Tuple2<RectangleClass, Integer> oldDataPoint, Tuple2<RectangleClass,
+ Integer> newDataPoint) {
+ return (newDataPoint.f0.b - newDataPoint.f0.a) - (oldDataPoint.f0.b - oldDataPoint.f0.a);
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void complexIntegrationTest5() throws Exception {
+ //Turning on and off chaining
+
+ expected1 = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" + "5\n" + "5\n" +
+ "5\n" + "5\n" + "5\n" + "1\n" + "3\n" + "3\n" + "4\n" + "5\n" + "5\n" + "6\n" + "8\n" + "9\n" + "10\n" +
+ "12\n" + "15\n" + "16\n" + "20\n" + "25\n";
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ DataStream<Long> dataStream51 = env.generateSequence(1, 5)
+ .map(new MapFunction<Long, Long>() {
+
+ @Override
+ public Long map(Long value) throws Exception {
+ return value;
+ }
+ }).startNewChain()
+ .filter(new FilterFunction<Long>() {
+
+ @Override
+ public boolean filter(Long value) throws Exception {
+ return true;
+ }
+ }).disableChaining()
+ .flatMap(new SquareFlatMapFunction());
+
+ DataStream<Long> dataStream52 = dataStream51.fold(0L, new FoldFunction<Long, Long>() {
+
+ @Override
+ public Long fold(Long accumulator, Long value) throws Exception {
+ return accumulator + value;
+ }
+ }).map(new MapFunction<Long, Long>() {
+
+ @Override
+ public Long map(Long value) throws Exception {
+ return value;
+ }
+ }).disableChaining();
+
+ DataStream<Long> dataStream53 = dataStream51.map(new MapFunction<Long, Long>() {
+
+ @Override
+ public Long map(Long value) throws Exception {
+ return value;
+ }
+ });
+
+
+ dataStream53.merge(dataStream52)//.print();
+ .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+ }
+
+
+ @Test
+ public void complexIntegrationTest6() throws Exception {
+ //Testing java collections and date-time types
+
+ expected1 = "(6,(a,6))\n" + "(6,(b,3))\n" + "(6,(c,4))\n" + "(6,(d,2))\n" + "(6,(f,2))\n" +
+ "(7,(a,1))\n" + "(7,(b,2))\n" + "(7,(c,3))\n" + "(7,(d,1))\n" + "(7,(e,1))\n" + "(7,(f,1))\n" +
+ "(8,(a,6))\n" + "(8,(b,4))\n" + "(8,(c,5))\n" + "(8,(d,1))\n" + "(8,(e,2))\n" + "(8,(f,2))\n" +
+ "(9,(a,4))\n" + "(9,(b,4))\n" + "(9,(c,7))\n" + "(9,(d,3))\n" + "(9,(e,1))\n" + "(9,(f,2))\n" +
+ "(10,(a,3))\n" + "(10,(b,2))\n" + "(10,(c,3))\n" + "(10,(d,2))\n" + "(10,(e,1))\n" + "(10,(f,1))";
+ expected2 = "[a, a, c, c, d, f]\n" + "[a, b, b, d]\n" + "[a, a, a, b, c, c, f]\n" + "[a, d, e]\n" +
+ "[b, b, c, c, c, f]\n" + "[a, a, a, a, b, b, c, c, e]\n" + "[a, a, b, b, c, c, c, d, e, f, f]\n" +
+ "[a, a, a, b, c, c, c, d, d, f]\n" + "[a, b, b, b, c, c, c, c, d, e, f]\n" +
+ "[a, a, a, b, b, c, c, c, d, d, e, f]";
+
+ SimpleDateFormat ft = new SimpleDateFormat("dd-MM-yyyy");
+
+ ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = new ArrayList<Tuple2<Date, HashMap<Character,
+ Integer>>>();
+ HashMap<Character, Integer> sale1 = new HashMap<Character, Integer>();
+ sale1.put('a', 2);
+ sale1.put('c', 2);
+ sale1.put('d', 1);
+ sale1.put('f', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("03-06-2014"), sale1));
+
+ HashMap<Character, Integer> sale2 = new HashMap<Character, Integer>();
+ sale2.put('a', 1);
+ sale2.put('b', 2);
+ sale2.put('d', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("10-06-2014"), sale2));
+
+ HashMap<Character, Integer> sale3 = new HashMap<Character, Integer>();
+ sale3.put('a', 3);
+ sale3.put('b', 1);
+ sale3.put('c', 2);
+ sale3.put('f', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("29-06-2014"), sale3));
+
+ HashMap<Character, Integer> sale4 = new HashMap<Character, Integer>();
+ sale4.put('a', 1);
+ sale4.put('d', 1);
+ sale4.put('e', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("15-07-2014"), sale4));
+
+ HashMap<Character, Integer> sale5 = new HashMap<Character, Integer>();
+ sale5.put('b', 2);
+ sale5.put('c', 3);
+ sale5.put('f', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("24-07-2014"), sale5));
+
+ HashMap<Character, Integer> sale6 = new HashMap<Character, Integer>();
+ sale6.put('a', 4);
+ sale6.put('b', 2);
+ sale6.put('c', 2);
+ sale6.put('e', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("17-08-2014"), sale6));
+
+ HashMap<Character, Integer> sale7 = new HashMap<Character, Integer>();
+ sale7.put('a', 2);
+ sale7.put('b', 2);
+ sale7.put('c', 3);
+ sale7.put('d', 1);
+ sale7.put('e', 1);
+ sale7.put('f', 2);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("27-08-2014"), sale7));
+
+ HashMap<Character, Integer> sale8 = new HashMap<Character, Integer>();
+ sale8.put('a', 3);
+ sale8.put('b', 1);
+ sale8.put('c', 3);
+ sale8.put('d', 2);
+ sale8.put('f', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("16-09-2014"), sale8));
+
+ HashMap<Character, Integer> sale9 = new HashMap<Character, Integer>();
+ sale9.put('a', 1);
+ sale9.put('b', 3);
+ sale9.put('c', 4);
+ sale9.put('d', 1);
+ sale9.put('e', 1);
+ sale9.put('f', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("25-09-2014"), sale9));
+
+ HashMap<Character, Integer> sale10 = new HashMap<Character, Integer>();
+ sale10.put('a', 3);
+ sale10.put('b', 2);
+ sale10.put('c', 3);
+ sale10.put('d', 2);
+ sale10.put('e', 1);
+ sale10.put('f', 1);
+ sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("01-10-2014"), sale10));
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
+ sourceStream6.window(Time.of(1, new Timestamp6()))
+ .reduceWindow(new SalesReduceFunction())
+ .flatten()
+ .flatMap(new FlatMapFunction6())
+ .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+ sourceStream6.map(new MapFunction6())
+ .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ }
+
+ private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String,
+ Double, Boolean>> {
+
+ @Override
+ public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double,
+ Boolean> value) throws Exception {
+ return new Tuple4<Integer, String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2,
+ value.f3, value.f4);
+ }
+
+ }
+
+ public static class InnerPojo {
+ public Long f0;
+ public String f1;
+
+ public InnerPojo(Long f0, String f1) {
+ this.f0 = f0;
+ this.f1 = f1;
+ }
+
+ @Override
+ public String toString() {
+ return "POJO(" + f0 + "," + f1 + ")";
+ }
+ }
+
+ public static class OuterPojo {
+ public InnerPojo f0;
+ public Long f1;
+
+ public OuterPojo(InnerPojo f0, Long f1) {
+ this.f0 = f0;
+ this.f1 = f1;
+ }
+
+ @Override
+ public String toString() {
+ return "POJO(" + f0 + "," + f1 + ")";
+ }
+ }
+
+ private static class PojoSource implements SourceFunction<OuterPojo> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(Collector<OuterPojo> collector) throws Exception {
+ for (long i = 0; i < 20; i++) {
+ collector.collect(new OuterPojo(new InnerPojo(i / 2, "water_melon-b"), 2L));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // no cleanup needed
+ }
+ }
+
+ private static class TupleSource implements SourceFunction<Tuple2<Long, Tuple2<String, Long>>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(Collector<Tuple2<Long, Tuple2<String, Long>>> collector) throws Exception {
+ for (int i = 0; i < 20; i++) {
+ collector.collect(new Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L)));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // no cleanup needed
+ }
+ }
+
+ private class IncrementMap implements MapFunction<Tuple2<Long, Tuple2<String, Long>>, Tuple2<Long, Tuple2<String,
+ Long>>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Long, Tuple2<String, Long>> map(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
+ return new Tuple2<Long, Tuple2<String, Long>>(value.f0 + 1, value.f1);
+ }
+ }
+
+ private static class MyTimestamp implements Timestamp<Tuple5<Integer, String, Character, Double, Boolean>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Tuple5<Integer, String, Character, Double, Boolean> value) {
+ return (long) value.f0;
+ }
+ }
+
+ private static class MyFlatMapFunction implements FlatMapFunction<Tuple4<Integer, String, Double,
+ Boolean>, OuterPojo> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Tuple4<Integer, String, Double, Boolean> value, Collector<OuterPojo> out) throws
+ Exception {
+ if (value.f3) {
+ for (int i = 0; i < 2; i++) {
+ out.collect(new OuterPojo(new InnerPojo((long) value.f0, value.f1), (long) i));
+ }
+ }
+ }
+ }
+
+ private class MyCoMapFunction implements CoMapFunction<OuterPojo, OuterPojo, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map1(OuterPojo value) {
+ return value.f0.f1;
+ }
+
+ @Override
+ public String map2(OuterPojo value) {
+ return value.f0.f1;
+ }
+ }
+
+ private class MyOutputSelector implements OutputSelector<Tuple2<Long, Tuple2<String, Long>>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Iterable<String> select(Tuple2<Long, Tuple2<String, Long>> value) {
+ List<String> output = new ArrayList<String>();
+ if (value.f0 == 10) {
+ output.add("iterate");
+ output.add("firstOutput");
+ } else if (value.f0 == 20) {
+ output.add("secondOutput");
+ } else {
+ output.add("iterate");
+ }
+ return output;
+ }
+ }
+
+ private static class PrimeFilterFunction implements FilterFunction<Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Long value) throws Exception {
+ if (value < 2) {
+ return false;
+ } else {
+ for (long i = 2; i < value; i++) {
+ if (value % i == 0) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ private static class DivisorsFlatMapFunction implements FlatMapFunction<Long, Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Long value, Collector<Long> out) throws Exception {
+ for (long i = 2; i <= value; i++) {
+ if (value % i == 0) {
+ out.collect(i);
+ }
+ }
+ }
+ }
+
+ private static class RectangleSource implements SourceFunction<RectangleClass> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(Collector<RectangleClass> collector) throws Exception {
+ RectangleClass rectangle = new RectangleClass(100, 100);
+
+ for (int i = 0; i < 100; i++) {
+ collector.collect(rectangle);
+ rectangle = rectangle.next();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // no cleanup needed
+ }
+ }
+
+ private static class RectangleMapFunction implements MapFunction<RectangleClass, Tuple2<RectangleClass, Integer>> {
+ private static final long serialVersionUID = 1L;
+ private int counter = 0;
+
+ @Override
+ public Tuple2<RectangleClass, Integer> map(RectangleClass value) throws Exception {
+ return new Tuple2<RectangleClass, Integer>(value, counter++);
+ }
+ }
+
+ private static class MyWindowMapFunction implements WindowMapFunction<Tuple2<RectangleClass, Integer>,
+ Tuple2<RectangleClass, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void mapWindow(Iterable<Tuple2<RectangleClass, Integer>> values, Collector<Tuple2<RectangleClass,
+ Integer>> out) throws Exception {
+ out.collect(values.iterator().next());
+ }
+ }
+
+ private static class SquareFlatMapFunction implements FlatMapFunction<Long, Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Long value, Collector<Long> out) throws Exception {
+ for (long i = 0; i < value; i++) {
+ out.collect(value);
+ }
+ }
+ }
+
+ private static class Timestamp6 implements Timestamp<Tuple2<Date, HashMap<Character, Integer>>> {
+
+ @Override
+ public long getTimestamp(Tuple2<Date, HashMap<Character, Integer>> value) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(value.f0);
+ return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH);
+ }
+ }
+
+ private static class SalesReduceFunction implements ReduceFunction<Tuple2<Date, HashMap<Character, Integer>>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Date, HashMap<Character, Integer>> reduce(Tuple2<Date, HashMap<Character, Integer>> value1,
+ Tuple2<Date,
+ HashMap<Character, Integer>> value2) throws Exception {
+ HashMap<Character, Integer> map1 = value1.f1;
+ HashMap<Character, Integer> map2 = value2.f1;
+ for (Character key : map2.keySet()) {
+ Integer volume1 = map1.get(key);
+ Integer volume2 = map2.get(key);
+ if (volume1 == null) {
+ volume1 = 0;
+ }
+ map1.put(key, volume1 + volume2);
+ }
+ return new Tuple2<Date, HashMap<Character, Integer>>(value2.f0, map1);
+ }
+ }
+
+ private static class FlatMapFunction6 implements FlatMapFunction<Tuple2<Date, HashMap<Character, Integer>>, Tuple2<Integer,
+ Tuple2<Character, Integer>>> {
+
+ @Override
+ public void flatMap(Tuple2<Date, HashMap<Character, Integer>> value, Collector<Tuple2<Integer,
+ Tuple2<Character, Integer>>> out) throws Exception {
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(value.f0);
+ for (Character key : value.f1.keySet()) {
+ out.collect(new Tuple2<Integer, Tuple2<Character, Integer>>(cal.get(Calendar.MONTH)
+ + 1,
+ new Tuple2<Character, Integer>(key, value.f1.get(key))));
+ }
+ }
+ }
+
+ private static class MapFunction6 implements MapFunction<Tuple2<Date, HashMap<Character, Integer>>, ArrayList<Character>> {
+
+ @Override
+ public ArrayList<Character> map(Tuple2<Date, HashMap<Character, Integer>> value)
+ throws Exception {
+ ArrayList<Character> list = new ArrayList<Character>();
+ for (Character ch : value.f1.keySet()) {
+ for (int i = 0; i < value.f1.get(ch); i++) {
+ list.add(ch);
+ }
+ }
+ Collections.sort(list);
+ return list;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
new file mode 100644
index 0000000..24fe1eb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.Serializable;
+
+public class RectangleClass implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public int a;
+ public int b;
+
+ public RectangleClass(int a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ public RectangleClass next() {
+ return new RectangleClass(a + (b % 11), b + (a % 9));
+ }
+
+ @Override
+ public String toString() {
+ return "(" + a + "," + b + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4786b56e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
new file mode 100644
index 0000000..55fc5e9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class StreamingMultipleProgramsTestBase extends MultipleProgramsTestBase {
+ public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+
+ @Parameterized.Parameters(name = "Execution mode = {0}")
+ public static Collection<TestExecutionMode[]> executionModes() {
+ TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};
+ ArrayList<TestExecutionMode[]> temsList = new ArrayList<TestExecutionMode[]>();
+ temsList.add(tems);
+ return temsList;
+ }
+}
[2/2] flink git commit: [streaming] Added proper
StreamingMultipleProgramsBase
Posted by mb...@apache.org.
[streaming] Added proper StreamingMultipleProgramsBase
Also minor fixes for streaming complex integration test
Closes #520
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48e21a1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48e21a1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48e21a1a
Branch: refs/heads/master
Commit: 48e21a1ae5ac1e762aa670f53ef1a976bacabf8c
Parents: 4786b56
Author: mbalassi <mb...@apache.org>
Authored: Tue May 12 16:21:35 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed May 13 12:08:38 2015 +0200
----------------------------------------------------------------------
.../api/complex/ComplexIntegrationTest.java | 124 ++++++++++++-------
.../flink/streaming/util/RectangleClass.java | 43 -------
.../util/StreamingMultipleProgramsTestBase.java | 43 ++++++-
.../scala/table/test/AggregationsITCase.scala | 2 +-
.../flink/api/scala/table/test/AsITCase.scala | 2 +-
.../api/scala/table/test/CastingITCase.scala | 2 +-
.../scala/table/test/ExpressionsITCase.scala | 2 +-
.../api/scala/table/test/FilterITCase.scala | 2 +-
.../table/test/GroupedAggreagationsITCase.scala | 2 +-
.../flink/api/scala/table/test/JoinITCase.scala | 2 +-
.../api/scala/table/test/SelectITCase.scala | 2 +-
.../table/test/StringExpressionsITCase.scala | 2 +-
.../util/AbstractMultipleProgramsTestBase.java | 84 +++++++++++++
.../test/util/MultipleProgramsTestBase.java | 35 +-----
.../api/scala/actions/CountCollectITCase.scala | 2 +-
.../scala/functions/ClosureCleanerITCase.scala | 2 +-
.../scala/io/ScalaCsvReaderWithPOJOITCase.scala | 2 +-
.../api/scala/operators/AggregateITCase.scala | 2 +-
.../api/scala/operators/CoGroupITCase.scala | 2 +-
.../flink/api/scala/operators/CrossITCase.scala | 2 +-
.../api/scala/operators/DistinctITCase.scala | 2 +-
.../api/scala/operators/ExamplesITCase.scala | 2 +-
.../api/scala/operators/FilterITCase.scala | 2 +-
.../api/scala/operators/FirstNITCase.scala | 2 +-
.../api/scala/operators/FlatMapITCase.scala | 2 +-
.../scala/operators/GroupCombineITCase.scala | 2 +-
.../api/scala/operators/GroupReduceITCase.scala | 2 +-
.../flink/api/scala/operators/JoinITCase.scala | 2 +-
.../flink/api/scala/operators/MapITCase.scala | 2 +-
.../api/scala/operators/PartitionITCase.scala | 2 +-
.../api/scala/operators/ReduceITCase.scala | 2 +-
.../api/scala/operators/SumMinMaxITCase.scala | 2 +-
.../flink/api/scala/operators/UnionITCase.scala | 2 +-
.../scala/runtime/ScalaSpecialTypesITCase.scala | 2 +-
34 files changed, 236 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 74b5f1d..67c1387 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.RectangleClass;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
@@ -64,8 +63,11 @@ import java.util.HashMap;
import java.util.List;
@RunWith(Parameterized.class)
-public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase implements Serializable {
- private static final long serialVersionUID = 1L;
+public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
+
+ // *************************************************************************
+ // GENERAL SETUP
+ // *************************************************************************
private String resultPath1;
private String resultPath2;
@@ -93,6 +95,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
compareResultsByLinesInMemory(expected2, resultPath2);
}
+ // *************************************************************************
+ // INTEGRATION TESTS
+ // *************************************************************************
+
@Test
public void complexIntegrationTest1() throws Exception {
//Testing data stream splitting with tuples
@@ -113,7 +119,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
//We create a separate environment for this test because of the slot-related to iteration issues.
StreamExecutionEnvironment env = new TestStreamEnvironment(4, 32); //StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.sum(0).setParallelism(1).filter(new FilterFunction
@@ -131,7 +136,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
step.select("firstOutput")
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
- step.select("secondOutput")//.print();
+ step.select("secondOutput")
.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
env.execute();
@@ -173,8 +178,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
new Tuple5<Integer, String, Character, Double, Boolean>(17, "peach", 'd', 1.0, true));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setBufferTimeout(100);
- env.setParallelism(4);
SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
@@ -215,7 +218,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
expected2 += "(" + 20000 + "," + 1 + ")";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
@@ -237,8 +239,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
return new Tuple2<Long, Integer>(value, 1);
}
})
-// .groupBy(0)
-// .sum(1)
.groupBy(0)
.window(Count.of(10000)).sum(1).flatten()
.filter(new FilterFunction<Tuple2<Long, Integer>>() {
@@ -269,7 +269,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
"((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
env.addSource(new RectangleSource())
.global()
@@ -303,7 +302,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
"12\n" + "15\n" + "16\n" + "20\n" + "25\n";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Long> dataStream51 = env.generateSequence(1, 5)
.map(new MapFunction<Long, Long>() {
@@ -345,7 +343,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
});
- dataStream53.merge(dataStream52)//.print();
+ dataStream53.merge(dataStream52)
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
env.execute();
@@ -445,7 +443,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("01-10-2014"), sale10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
sourceStream6.window(Time.of(1, new Timestamp6()))
@@ -461,6 +458,10 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
}
+ // *************************************************************************
+ // FUNCTIONS
+ // *************************************************************************
+
private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String,
Double, Boolean>> {
@@ -473,36 +474,6 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
}
- public static class InnerPojo {
- public Long f0;
- public String f1;
-
- public InnerPojo(Long f0, String f1) {
- this.f0 = f0;
- this.f1 = f1;
- }
-
- @Override
- public String toString() {
- return "POJO(" + f0 + "," + f1 + ")";
- }
- }
-
- public static class OuterPojo {
- public InnerPojo f0;
- public Long f1;
-
- public OuterPojo(InnerPojo f0, Long f1) {
- this.f0 = f0;
- this.f1 = f1;
- }
-
- @Override
- public String toString() {
- return "POJO(" + f0 + "," + f1 + ")";
- }
- }
-
private static class PojoSource implements SourceFunction<OuterPojo> {
private static final long serialVersionUID = 1L;
@@ -746,4 +717,69 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase im
}
}
+ // *************************************************************************
+ // DATA TYPES
+ // *************************************************************************
+
+ //Flink Pojo
+ public static class InnerPojo {
+ public Long f0;
+ public String f1;
+
+ //default constructor to qualify as Flink POJO
+ InnerPojo(){}
+
+ public InnerPojo(Long f0, String f1) {
+ this.f0 = f0;
+ this.f1 = f1;
+ }
+
+ @Override
+ public String toString() {
+ return "POJO(" + f0 + "," + f1 + ")";
+ }
+ }
+
+ // Nested class serialized with Kryo
+ public static class OuterPojo {
+ public InnerPojo f0;
+ public Long f1;
+
+ public OuterPojo(InnerPojo f0, Long f1) {
+ this.f0 = f0;
+ this.f1 = f1;
+ }
+
+ @Override
+ public String toString() {
+ return "POJO(" + f0 + "," + f1 + ")";
+ }
+ }
+
+ public static class RectangleClass implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public int a;
+ public int b;
+
+ //default constructor to qualify as Flink POJO
+ public RectangleClass() {}
+
+ public RectangleClass(int a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ public RectangleClass next() {
+ return new RectangleClass(a + (b % 11), b + (a % 9));
+ }
+
+ @Override
+ public String toString() {
+ return "(" + a + "," + b + ")";
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
deleted file mode 100644
index 24fe1eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/RectangleClass.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.Serializable;
-
-public class RectangleClass implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- public int a;
- public int b;
-
- public RectangleClass(int a, int b) {
- this.a = a;
- this.b = b;
- }
-
- public RectangleClass next() {
- return new RectangleClass(a + (b % 11), b + (a % 9));
- }
-
- @Override
- public String toString() {
- return "(" + a + "," + b + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 55fc5e9..36e62f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -18,18 +18,55 @@
package org.apache.flink.streaming.util;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase;
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collection;
-public class StreamingMultipleProgramsTestBase extends MultipleProgramsTestBase {
+/**
+ * Base class for streaming unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the StreamExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ * @Test
+ * public void someTest() {
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * // test code
+ * env.execute();
+ * }
+ *
+ * @Test
+ * public void anotherTest() {
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * // test code
+ * env.execute();
+ * }
+ *
+ * }</pre>
+ */
+public class StreamingMultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
+
public StreamingMultipleProgramsTestBase(TestExecutionMode mode) {
super(mode);
+ switch(this.mode){
+ case CLUSTER:
+ TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+ clusterEnv.setAsContext();
+ break;
+ case COLLECTION:
+ throw new UnsupportedOperationException("Flink streaming currently has no collection execution backend.");
+ }
}
-
@Parameterized.Parameters(name = "Execution mode = {0}")
public static Collection<TestExecutionMode[]> executionModes() {
TestExecutionMode[] tems = new TestExecutionMode[]{TestExecutionMode.CLUSTER};
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 9049f3c..38be85e 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 536bad4..3a0cc69 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 4c0e624..9557985 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 9a52f8f..51dc428 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index d1f5485..982a302 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index 30498fa..1f29722 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index 53d1da0..17221d8 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index 2396cea..6ba6c9f 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index c5950bf..3f0f46f 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
new file mode 100644
index 0000000..ef81dfe
--- /dev/null
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractMultipleProgramsTestBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Abstract base class for unit tests that run multiple tests and want to reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the ExecutionEnvironment from
+ * the context:
+ *
+ * <pre>{@code
+ *
+ * @Test
+ * public void someTest() {
+ * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ * // test code
+ * env.execute();
+ * }
+ *
+ * @Test
+ * public void anotherTest() {
+ * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ * // test code
+ * env.execute();
+ * }
+ *
+ * }</pre>
+ */
+public abstract class AbstractMultipleProgramsTestBase extends TestBaseUtils {
+
+ /**
+ * Enum that defines which execution environment to run the next test on:
+ * An embedded local flink cluster, or the collection execution backend.
+ */
+ public enum TestExecutionMode {
+ CLUSTER,
+ COLLECTION
+ }
+
+ // -----------------------------------------------------------------------------------------...
+
+ private static final int DEFAULT_PARALLELISM = 4;
+
+ protected static ForkableFlinkMiniCluster cluster = null;
+
+ protected transient TestExecutionMode mode;
+
+ public AbstractMultipleProgramsTestBase(TestExecutionMode mode){
+ this.mode = mode;
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception{
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 3e4bb33..e0c4360 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,8 +18,6 @@
package org.apache.flink.test.util;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.runners.Parameterized;
import java.util.Arrays;
@@ -53,28 +51,11 @@ import java.util.Collection;
*
* }</pre>
*/
-public class MultipleProgramsTestBase extends TestBaseUtils {
-
- /**
- * Enum that defines which execution environment to run the next test on:
- * An embedded local flink cluster, or the collection execution backend.
- */
- public enum TestExecutionMode {
- CLUSTER,
- COLLECTION
- }
-
- // -----------------------------------------------------------------------------------------...
-
- private static final int DEFAULT_PARALLELISM = 4;
-
- protected static ForkableFlinkMiniCluster cluster = null;
-
- protected transient TestExecutionMode mode;
+public class MultipleProgramsTestBase extends AbstractMultipleProgramsTestBase {
public MultipleProgramsTestBase(TestExecutionMode mode){
- this.mode = mode;
- switch(mode){
+ super(mode);
+ switch(this.mode){
case CLUSTER:
TestEnvironment clusterEnv = new TestEnvironment(cluster, 4);
clusterEnv.setAsContext();
@@ -86,16 +67,6 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
}
}
- @BeforeClass
- public static void setup() throws Exception{
- cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false);
- }
-
- @AfterClass
- public static void teardown() throws Exception {
- stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
- }
-
@Parameterized.Parameters(name = "Execution mode = {0}")
public static Collection<TestExecutionMode[]> executionModes(){
return Arrays.asList(new TestExecutionMode[]{TestExecutionMode.CLUSTER},
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
index 2d2cdd3..d19f543 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/actions/CountCollectITCase.scala
@@ -19,8 +19,8 @@
package org.apache.flink.api.scala.actions
import org.apache.flink.api.scala._
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit.Test
import org.junit.Assert._
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index 2cae79c..e9b4ffe 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.functions
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit.Assert.fail
import org.junit.{After, Before, Test, Rule}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
index 21aa93d..7e395d9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/ScalaCsvReaderWithPOJOITCase.scala
@@ -23,7 +23,7 @@ import com.google.common.io.Files
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.junit.Assert._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 484226d..432a6d4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 3379fe2..eaf4117 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
index a8611c8..512ec6c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index 31c6052..8c10271 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
index 2bccc5b..7cf802c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, JavaProgramTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
index 8336ae3..082201b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
index 9c59206..36cc3a7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
index 4a66b80..61751da 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.junit.{Test, After, Before, Rule}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 380b3bc..1b40205 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.test.javaApiOperators.GroupCombineITCase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.apache.flink.util.Collector
import org.junit._
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index fe9e3f3..b832647 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -28,7 +28,7 @@ CustomType}
import org.apache.flink.optimizer.Optimizer
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.apache.flink.util.Collector
import org.hamcrest.core.{IsNot, IsEqual}
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
index 4135ab2..fc4a9ce 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichJoinFunction
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 5ade21f..64fdc1f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 98bb446..08d82d2 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunctio
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index 9e147b8..a9f420f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.scala.util.CollectionDataSets.MutableTuple3
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index d94f099..10bb7c5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit.Test
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index 2cf3ab3..b0b3764 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder
http://git-wip-us.apache.org/repos/asf/flink/blob/48e21a1a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
index 63ec2a4..48a33f7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesITCase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.scala.runtime
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.AbstractMultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit._
import org.junit.rules.TemporaryFolder