You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:43 UTC

[27/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
deleted file mode 100644
index 4c0f59f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class TypeFillTest extends StreamingMultipleProgramsTestBase {
-
-	@Test
-	public void test() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		try {
-			env.addSource(new TestSource<Integer>()).print();
-			fail();
-		} catch (Exception e) {
-		}
-
-		DataStream<Long> source = env.generateSequence(1, 10);
-
-		try {
-			source.map(new TestMap<Long, Long>()).print();
-			fail();
-		} catch (Exception e) {
-		}
-		try {
-			source.flatMap(new TestFlatMap<Long, Long>()).print();
-			fail();
-		} catch (Exception e) {
-		}
-		try {
-			source.connect(source).map(new TestCoMap<Long, Long, Integer>()).print();
-			fail();
-		} catch (Exception e) {
-		}
-		try {
-			source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>()).print();
-			fail();
-		} catch (Exception e) {
-		}
-
-		env.addSource(new TestSource<Integer>()).returns("Integer");
-		source.map(new TestMap<Long, Long>()).returns(Long.class).print();
-		source.flatMap(new TestFlatMap<Long, Long>()).returns("Long").print();
-		source.connect(source).map(new TestCoMap<Long, Long, Integer>()).returns("Integer").print();
-		source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
-				.returns(BasicTypeInfo.INT_TYPE_INFO).print();
-		
-		assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
-				source.map(new TestMap<Long, Long>()).returns(Long.class).getType());
-
-		SingleOutputStreamOperator<String, ?> map = source.map(new MapFunction<Long, String>() {
-
-			@Override
-			public String map(Long value) throws Exception {
-				return null;
-			}
-		});
-
-		map.print();
-		try {
-			map.returns("String");
-			fail();
-		} catch (Exception e) {
-		}
-
-	}
-
-	private class TestSource<T> implements SourceFunction<T> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void run(SourceContext<T> ctx) throws Exception {
-
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-
-	private class TestMap<T, O> implements MapFunction<T, O> {
-		@Override
-		public O map(T value) throws Exception {
-			return null;
-		}
-	}
-
-	private class TestFlatMap<T, O> implements FlatMapFunction<T, O> {
-		@Override
-		public void flatMap(T value, Collector<O> out) throws Exception {
-		}
-	}
-
-	private class TestCoMap<IN1, IN2, OUT> implements CoMapFunction<IN1, IN2, OUT> {
-
-		@Override
-		public OUT map1(IN1 value) {
-			return null;
-		}
-
-		@Override
-		public OUT map2(IN2 value) {
-			return null;
-		}
-
-	}
-
-	private class TestCoFlatMap<IN1, IN2, OUT> implements CoFlatMapFunction<IN1, IN2, OUT> {
-
-		@Override
-		public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
-		}
-
-		@Override
-		public void flatMap2(IN2 value, Collector<OUT> out) throws Exception {
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
deleted file mode 100644
index d2e24c9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.junit.Test;
-
-public class DirectedOutputTest extends StreamingMultipleProgramsTestBase {
-
-	private static final String TEN = "ten";
-	private static final String ODD = "odd";
-	private static final String EVEN = "even";
-	private static final String NON_SELECTED = "nonSelected";
-
-	static final class MyOutputSelector implements OutputSelector<Long> {
-		private static final long serialVersionUID = 1L;
-
-		List<String> outputs = new ArrayList<String>();
-
-		@Override
-		public Iterable<String> select(Long value) {
-			outputs.clear();
-			if (value % 2 == 0) {
-				outputs.add(EVEN);
-			} else {
-				outputs.add(ODD);
-			}
-
-			if (value == 10L) {
-				outputs.add(TEN);
-			}
-
-			if (value == 11L) {
-				outputs.add(NON_SELECTED);
-			}
-			return outputs;
-		}
-	}
-
-	static final class ListSink implements SinkFunction<Long> {
-		private static final long serialVersionUID = 1L;
-
-		private String name;
-		private transient List<Long> list;
-
-		public ListSink(String name) {
-			this.name = name;
-		}
-
-		@Override
-		public void invoke(Long value) {
-			list.add(value);
-		}
-
-		private void readObject(java.io.ObjectInputStream in) throws IOException,
-				ClassNotFoundException {
-			in.defaultReadObject();
-			outputs.put(name, new ArrayList<Long>());
-			this.list = outputs.get(name);
-		}
-
-	}
-
-	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
-
-	@Test
-	public void outputSelectorTest() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
-		TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();
-		TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
-		TestListResultSink<Long> allSink = new TestListResultSink<Long>();
-
-		SplitStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
-		source.select(EVEN).addSink(evenSink);
-		source.select(ODD, TEN).addSink(oddAndTenSink);
-		source.select(EVEN, ODD).addSink(evenAndOddSink);
-		source.addSink(allSink);
-
-		env.execute();
-		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult());
-		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
-				evenAndOddSink.getSortedResult());
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
-				allSink.getSortedResult());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
deleted file mode 100644
index a3d89f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/OutputSelectorTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.junit.Test;
-
-public class OutputSelectorTest {
-
-	static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Iterable<String> select(Tuple1<Integer> tuple) {
-
-			String[] outputs = new String[tuple.f0];
-
-			for (Integer i = 0; i < tuple.f0; i++) {
-				outputs[i] = i.toString();
-			}
-			return Arrays.asList(outputs);
-		}
-	}
-
-	@Test
-	public void testGetOutputs() {
-		OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
-		List<String> expectedOutputs = new ArrayList<String>();
-		expectedOutputs.add("0");
-		expectedOutputs.add("1");
-		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2)));
-		expectedOutputs.add("2");
-		assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3)));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
deleted file mode 100644
index 020dda3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ /dev/null
@@ -1,837 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.complex;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-@SuppressWarnings("serial")
-public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
-
-	// *************************************************************************
-	// GENERAL SETUP
-	// *************************************************************************
-
-	private String resultPath1;
-	private String resultPath2;
-	private String expected1;
-	private String expected2;
-	
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception {
-		resultPath1 = tempFolder.newFile().toURI().toString();
-		resultPath2 = tempFolder.newFile().toURI().toString();
-		expected1 = "";
-		expected2 = "";
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected1, resultPath1);
-		compareResultsByLinesInMemory(expected2, resultPath2);
-	}
-
-	// *************************************************************************
-	// INTEGRATION TESTS
-	// *************************************************************************
-
-	@Test
-	public void complexIntegrationTest1() throws Exception {
-		//Testing data stream splitting with tuples
-
-		expected1 = "";
-		for (int i = 0; i < 8; i++) {
-			expected1 += "(10,(a,1))\n";
-		}
-		//i == 8
-		expected1 += "(10,(a,1))";
-
-		expected2 = "";
-		for (int i = 0; i < 18; i++) {
-			expected2 += "(20,(a,1))\n";
-		}
-		//i == 18
-		expected2 += "(20,(a,1))";
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);
-
-		IterativeStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
-
-					Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<>(
-							0L, new Tuple2<>("", 0L));
-
-					@Override
-					public Tuple2<Long, Tuple2<String, Long>> map(
-							Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
-						result.f0 = result.f0 + value.f0;
-						result.f1 = value.f1;
-						return result;
-			}
-			
-		})
-				.setParallelism(1).filter(new FilterFunction
-				<Tuple2<Long, Tuple2<String, Long>>>() {
-
-			@Override
-			public boolean filter(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
-				return value.f0 < 20;
-			}
-		}).iterate(5000);
-
-		SplitStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new
-				MyOutputSelector());
-		it.closeWith(step.select("iterate"));
-
-		step.select("firstOutput")
-				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-		step.select("secondOutput")
-				.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-	}
-
-	// Disabled, because it depends on strange behaviour, for example of the sum() function.
-	// This test evens fails, for example, if the order of only two lines in the "input" is changed.
-	@SuppressWarnings("unchecked")
-	@Ignore
-	@Test
-	public void complexIntegrationTest2() throws Exception {
-		//Testing POJO source, grouping by multiple filds and windowing with timestamp
-
-		expected1 = "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
-				"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
-				"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
-				"water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" + "water_melon-b\n" +
-				"water_melon-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" + "orange-b\n" +
-				"orange-b\n" + "orange-c\n" + "orange-c\n" + "orange-c\n" + "orange-c\n" + "orange-d\n" + "orange-d\n" +
-				"peach-d\n" + "peach-d\n";
-
-		List<Tuple5<Integer, String, Character, Double, Boolean>> input = Arrays.asList(
-				new Tuple5<>(1, "apple", 'j', 0.1, false),
-				new Tuple5<>(1, "peach", 'b', 0.8, false),
-				new Tuple5<>(1, "orange", 'c', 0.7, true),
-				new Tuple5<>(2, "apple", 'd', 0.5, false),
-				new Tuple5<>(2, "peach", 'j', 0.6, false),
-				new Tuple5<>(3, "orange", 'b', 0.2, true),
-				new Tuple5<>(6, "apple", 'c', 0.1, false),
-				new Tuple5<>(7, "peach", 'd', 0.4, false),
-				new Tuple5<>(8, "orange", 'j', 0.2, true),
-				new Tuple5<>(10, "apple", 'b', 0.1, false),
-				new Tuple5<>(10, "peach", 'c', 0.5, false),
-				new Tuple5<>(11, "orange", 'd', 0.3, true),
-				new Tuple5<>(11, "apple", 'j', 0.3, false),
-				new Tuple5<>(12, "peach", 'b', 0.9, false),
-				new Tuple5<>(13, "orange", 'c', 0.7, true),
-				new Tuple5<>(15, "apple", 'd', 0.2, false),
-				new Tuple5<>(16, "peach", 'j', 0.8, false),
-				new Tuple5<>(16, "orange", 'b', 0.8, true),
-				new Tuple5<>(16, "apple", 'c', 0.1, false),
-				new Tuple5<>(17, "peach", 'd', 1.0, true));
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableTimestamps();
-
-		SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
-		DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
-
-		sourceStream21
-				.assignTimestamps(new MyTimestampExtractor())
-				.keyBy(2, 2)
-				.timeWindow(Time.of(10, TimeUnit.MILLISECONDS), Time.of(4, TimeUnit.MILLISECONDS))
-				.maxBy(3)
-				.map(new MyMapFunction2())
-				.flatMap(new MyFlatMapFunction())
-				.connect(sourceStream22)
-				.map(new MyCoMapFunction())
-				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void complexIntegrationTest3() throws Exception {
-		//Heavy prime factorisation with maps and flatmaps
-
-		expected1 = "541\n" + "1223\n" + "3319\n" + "5851\n" + "1987\n" + "8387\n" + "15907\n" + "10939\n" +
-				"4127\n" + "2477\n" + "6737\n" + "13421\n" + "4987\n" + "4999\n" + "18451\n" + "9283\n" + "7499\n" +
-				"16937\n" + "11927\n" + "9973\n" + "14431\n" + "19507\n" + "12497\n" + "17497\n" + "14983\n" +
-				"19997\n";
-
-		expected1 = "541\n" + "1223\n" + "1987\n" + "2741\n" + "3571\n" + "10939\n" + "4409\n" +
-				"5279\n" + "11927\n" + "6133\n" + "6997\n" + "12823\n" + "7919\n" + "8831\n" +
-				"13763\n" + "9733\n" + "9973\n" + "14759\n" + "15671\n" + "16673\n" + "17659\n" +
-				"18617\n" + "19697\n" + "19997\n";
-
-		for (int i = 2; i < 100; i++) {
-			expected2 += "(" + i + "," + 20000 / i + ")\n";
-		}
-		for (int i = 19901; i < 20000; i++) {
-			expected2 += "(" + i + "," + 20000 / i + ")\n";
-		}
-		//i == 20000
-		expected2 += "(" + 20000 + "," + 1 + ")";
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// set to parallelism 1 because otherwise we don't know which elements go to which parallel
-		// count-window.
-		env.setParallelism(1);
-
-		env.setBufferTimeout(0);
-
-		DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
-		DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
-
-		sourceStream31.filter(new PrimeFilterFunction())
-				.windowAll(GlobalWindows.create())
-				.trigger(PurgingTrigger.of(CountTrigger.of(100)))
-				.max(0)
-				.union(sourceStream32.filter(new PrimeFilterFunction())
-						.windowAll(GlobalWindows.create())
-						.trigger(PurgingTrigger.of(CountTrigger.of(100)))
-						.max(0))
-				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
-		sourceStream31.flatMap(new DivisorsFlatMapFunction())
-				.union(sourceStream32.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
-				Integer>>() {
-
-			@Override
-			public Tuple2<Long, Integer> map(Long value) throws Exception {
-				return new Tuple2<>(value, 1);
-			}
-		})
-				.keyBy(0)
-				.window(GlobalWindows.create())
-				.trigger(PurgingTrigger.of(CountTrigger.of(10_000)))
-				.sum(1)
-				.filter(new FilterFunction<Tuple2<Long, Integer>>() {
-
-					@Override
-					public boolean filter(Tuple2<Long, Integer> value) throws Exception {
-						return value.f0 < 100 || value.f0 > 19900;
-					}
-				})
-				.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-	}
-
-	@Test
-	@Ignore
-	public void complexIntegrationTest4() throws Exception {
-		//Testing mapping and delta-policy windowing with custom class
-
-		expected1 = "((100,100),0)\n" + "((120,122),5)\n" + "((121,125),6)\n" + "((138,144),9)\n" +
-				"((139,147),10)\n" + "((156,166),13)\n" + "((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" +
-				"((192,210),21)\n" + "((193,213),22)\n" + "((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" +
-				"((229,257),30)\n" + "((246,276),33)\n" + "((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" +
-				"((282,320),41)\n" + "((283,323),42)\n" + "((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" +
-				"((319,367),50)\n" + "((336,386),53)\n" + "((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" +
-				"((372,430),61)\n" + "((373,433),62)\n" + "((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" +
-				"((409,477),70)\n" + "((426,496),73)\n" + "((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" +
-				"((462,540),81)\n" + "((463,543),82)\n" + "((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" +
-				"((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		env.addSource(new RectangleSource())
-				.global()
-				.map(new RectangleMapFunction())
-				.windowAll(GlobalWindows.create())
-				.trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, new MyDelta())))
-				.apply(new MyWindowMapFunction())
-				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-	}
-
-	private static class MyDelta implements DeltaFunction<Tuple2<Rectangle, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public double getDelta(Tuple2<Rectangle, Integer> oldDataPoint, Tuple2<Rectangle,
-				Integer> newDataPoint) {
-			return (newDataPoint.f0.b - newDataPoint.f0.a) - (oldDataPoint.f0.b - oldDataPoint.f0.a);
-		}
-	}
-
-
-	@Test
-	public void complexIntegrationTest5() throws Exception {
-		//Turning on and off chaining
-
-		expected1 = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" + "5\n" + "5\n" +
-				"5\n" + "5\n" + "5\n";
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// Set to parallelism 1 to make it deterministic, otherwise, it is not clear which
-		// elements will go to which parallel instance of the fold
-		env.setParallelism(1);
-		
-		env.setBufferTimeout(0);
-
-		DataStream<Long> dataStream51 = env.generateSequence(1, 5)
-				.map(new MapFunction<Long, Long>() {
-
-					@Override
-					public Long map(Long value) throws Exception {
-						return value;
-					}
-				}).startNewChain()
-				.filter(new FilterFunction<Long>() {
-
-					@Override
-					public boolean filter(Long value) throws Exception {
-						return true;
-					}
-				}).disableChaining()
-				.flatMap(new SquareFlatMapFunction());
-
-		DataStream<Long> dataStream53 = dataStream51.map(new MapFunction<Long, Long>() {
-
-			@Override
-			public Long map(Long value) throws Exception {
-				return value;
-			}
-		});
-
-
-		dataStream53.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-	}
-
-
-	@Test
-	@Ignore
-	public void complexIntegrationTest6() throws Exception {
-		//Testing java collections and date-time types
-
-		expected1 = "(6,(a,6))\n" + "(6,(b,3))\n" + "(6,(c,4))\n" + "(6,(d,2))\n" + "(6,(f,2))\n" +
-				"(7,(a,1))\n" + "(7,(b,2))\n" + "(7,(c,3))\n" + "(7,(d,1))\n" + "(7,(e,1))\n" + "(7,(f,1))\n" +
-				"(8,(a,6))\n" + "(8,(b,4))\n" + "(8,(c,5))\n" + "(8,(d,1))\n" + "(8,(e,2))\n" + "(8,(f,2))\n" +
-				"(9,(a,4))\n" + "(9,(b,4))\n" + "(9,(c,7))\n" + "(9,(d,3))\n" + "(9,(e,1))\n" + "(9,(f,2))\n" +
-				"(10,(a,3))\n" + "(10,(b,2))\n" + "(10,(c,3))\n" + "(10,(d,2))\n" + "(10,(e,1))\n" + "(10,(f,1))";
-		expected2 = "[a, a, c, c, d, f]\n" + "[a, b, b, d]\n" + "[a, a, a, b, c, c, f]\n" + "[a, d, e]\n" +
-				"[b, b, c, c, c, f]\n" + "[a, a, a, a, b, b, c, c, e]\n" + "[a, a, b, b, c, c, c, d, e, f, f]\n" +
-				"[a, a, a, b, c, c, c, d, d, f]\n" + "[a, b, b, b, c, c, c, c, d, e, f]\n" +
-				"[a, a, a, b, b, c, c, c, d, d, e, f]";
-
-		SimpleDateFormat ft = new SimpleDateFormat("dd-MM-yyyy");
-
-		ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = new ArrayList<>();
-		HashMap<Character, Integer> sale1 = new HashMap<>();
-		sale1.put('a', 2);
-		sale1.put('c', 2);
-		sale1.put('d', 1);
-		sale1.put('f', 1);
-		sales.add(new Tuple2<>(ft.parse("03-06-2014"), sale1));
-
-		HashMap<Character, Integer> sale2 = new HashMap<>();
-		sale2.put('a', 1);
-		sale2.put('b', 2);
-		sale2.put('d', 1);
-		sales.add(new Tuple2<>(ft.parse("10-06-2014"), sale2));
-
-		HashMap<Character, Integer> sale3 = new HashMap<>();
-		sale3.put('a', 3);
-		sale3.put('b', 1);
-		sale3.put('c', 2);
-		sale3.put('f', 1);
-		sales.add(new Tuple2<>(ft.parse("29-06-2014"), sale3));
-
-		HashMap<Character, Integer> sale4 = new HashMap<>();
-		sale4.put('a', 1);
-		sale4.put('d', 1);
-		sale4.put('e', 1);
-		sales.add(new Tuple2<>(ft.parse("15-07-2014"), sale4));
-
-		HashMap<Character, Integer> sale5 = new HashMap<>();
-		sale5.put('b', 2);
-		sale5.put('c', 3);
-		sale5.put('f', 1);
-		sales.add(new Tuple2<>(ft.parse("24-07-2014"), sale5));
-
-		HashMap<Character, Integer> sale6 = new HashMap<>();
-		sale6.put('a', 4);
-		sale6.put('b', 2);
-		sale6.put('c', 2);
-		sale6.put('e', 1);
-		sales.add(new Tuple2<>(ft.parse("17-08-2014"), sale6));
-
-		HashMap<Character, Integer> sale7 = new HashMap<>();
-		sale7.put('a', 2);
-		sale7.put('b', 2);
-		sale7.put('c', 3);
-		sale7.put('d', 1);
-		sale7.put('e', 1);
-		sale7.put('f', 2);
-		sales.add(new Tuple2<>(ft.parse("27-08-2014"), sale7));
-
-		HashMap<Character, Integer> sale8 = new HashMap<>();
-		sale8.put('a', 3);
-		sale8.put('b', 1);
-		sale8.put('c', 3);
-		sale8.put('d', 2);
-		sale8.put('f', 1);
-		sales.add(new Tuple2<>(ft.parse("16-09-2014"), sale8));
-
-		HashMap<Character, Integer> sale9 = new HashMap<>();
-		sale9.put('a', 1);
-		sale9.put('b', 3);
-		sale9.put('c', 4);
-		sale9.put('d', 1);
-		sale9.put('e', 1);
-		sale9.put('f', 1);
-		sales.add(new Tuple2<>(ft.parse("25-09-2014"), sale9));
-
-		HashMap<Character, Integer> sale10 = new HashMap<>();
-		sale10.put('a', 3);
-		sale10.put('b', 2);
-		sale10.put('c', 3);
-		sale10.put('d', 2);
-		sale10.put('e', 1);
-		sale10.put('f', 1);
-		sales.add(new Tuple2<>(ft.parse("01-10-2014"), sale10));
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableTimestamps();
-
-		DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
-		sourceStream6
-				.assignTimestamps(new Timestamp6())
-				.timeWindowAll(Time.of(1, TimeUnit.MILLISECONDS))
-				.reduce(new SalesReduceFunction())
-				.flatMap(new FlatMapFunction6())
-				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
-
-		sourceStream6.map(new MapFunction6())
-				.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
-
-		env.execute();
-
-	}
-
-	// *************************************************************************
-	// FUNCTIONS
-	// *************************************************************************
-
-	private static class MyMapFunction2 implements MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String,
-			Double, Boolean>> {
-
-		@Override
-		public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double,
-				Boolean> value) throws Exception {
-			return new Tuple4<>(value.f0, value.f1 + "-" + value.f2,
-					value.f3, value.f4);
-		}
-
-	}
-
-	private static class PojoSource implements SourceFunction<OuterPojo> {
-		private static final long serialVersionUID = 1L;
-
-		long cnt = 0;
-
-		@Override
-		public void run(SourceContext<OuterPojo> ctx) throws Exception {
-			for (int i = 0; i < 20; i++) {
-				OuterPojo result = new OuterPojo(new InnerPojo(cnt / 2, "water_melon-b"), 2L);
-				ctx.collect(result);
-			}
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-
-	private static class TupleSource implements SourceFunction<Tuple2<Long, Tuple2<String, Long>>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void run(SourceContext<Tuple2<Long, Tuple2<String, Long>>> ctx) throws Exception {
-			for (int i = 0; i < 20; i++) {
-				Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<>(1L, new Tuple2<>("a", 1L));
-				ctx.collect(result);
-			}
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-
-	private class IncrementMap implements MapFunction<Tuple2<Long, Tuple2<String, Long>>, Tuple2<Long, Tuple2<String,
-			Long>>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Long, Tuple2<String, Long>> map(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
-			return new Tuple2<>(value.f0 + 1, value.f1);
-		}
-	}
-
-	private static class MyTimestampExtractor implements TimestampExtractor<Tuple5<Integer, String, Character, Double, Boolean>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long extractTimestamp(Tuple5<Integer, String, Character, Double, Boolean> value, long currentTimestamp) {
-			return (long) value.f0;
-		}
-
-		@Override
-		public long extractWatermark(Tuple5<Integer, String, Character, Double, Boolean> value,
-				long currentTimestamp) {
-			return (long) value.f0 - 1;
-		}
-
-		@Override
-		public long getCurrentWatermark() {
-			return Long.MIN_VALUE;
-		}
-	}
-
-	private static class MyFlatMapFunction implements FlatMapFunction<Tuple4<Integer, String, Double,
-			Boolean>, OuterPojo> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Tuple4<Integer, String, Double, Boolean> value, Collector<OuterPojo> out) throws
-				Exception {
-			if (value.f3) {
-				for (int i = 0; i < 2; i++) {
-					out.collect(new OuterPojo(new InnerPojo((long) value.f0, value.f1), (long) i));
-				}
-			}
-		}
-	}
-
-	private class MyCoMapFunction implements CoMapFunction<OuterPojo, OuterPojo, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map1(OuterPojo value) {
-			return value.f0.f1;
-		}
-
-		@Override
-		public String map2(OuterPojo value) {
-			return value.f0.f1;
-		}
-	}
-
-	private class MyOutputSelector implements OutputSelector<Tuple2<Long, Tuple2<String, Long>>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Iterable<String> select(Tuple2<Long, Tuple2<String, Long>> value) {
-			List<String> output = new ArrayList<>();
-			if (value.f0 == 10) {
-				output.add("iterate");
-				output.add("firstOutput");
-			} else if (value.f0 == 20) {
-				output.add("secondOutput");
-			} else {
-				output.add("iterate");
-			}
-			return output;
-		}
-	}
-
-	private static class PrimeFilterFunction implements FilterFunction<Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Long value) throws Exception {
-			if (value < 2) {
-				return false;
-			} else {
-				for (long i = 2; i < value; i++) {
-					if (value % i == 0) {
-						return false;
-					}
-				}
-			}
-			return true;
-		}
-	}
-
-	private static class DivisorsFlatMapFunction implements FlatMapFunction<Long, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Long value, Collector<Long> out) throws Exception {
-			for (long i = 2; i <= value; i++) {
-				if (value % i == 0) {
-					out.collect(i);
-				}
-			}
-		}
-	}
-
-	private static class RectangleSource extends RichSourceFunction<Rectangle> {
-		private static final long serialVersionUID = 1L;
-		private transient Rectangle rectangle;
-
-		public void open(Configuration parameters) throws Exception {
-			rectangle = new Rectangle(100, 100);
-		}
-
-		@Override
-		public void run(SourceContext<Rectangle> ctx) throws Exception {
-			// emit once as the initializer of the delta trigger
-			ctx.collect(rectangle);
-			for (int i = 0; i < 100; i++) {
-				ctx.collect(rectangle);
-				rectangle = rectangle.next();
-			}
-		}
-
-		@Override
-		public void cancel() {
-		}
-	}
-
-	private static class RectangleMapFunction implements MapFunction<Rectangle, Tuple2<Rectangle, Integer>> {
-		private static final long serialVersionUID = 1L;
-		private int counter = 0;
-
-		@Override
-		public Tuple2<Rectangle, Integer> map(Rectangle value) throws Exception {
-			return new Tuple2<>(value, counter++);
-		}
-	}
-
-	private static class MyWindowMapFunction implements AllWindowFunction<Tuple2<Rectangle, Integer>, Tuple2<Rectangle, Integer>, GlobalWindow> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void apply(GlobalWindow window, Iterable<Tuple2<Rectangle, Integer>> values, Collector<Tuple2<Rectangle,
-				Integer>> out) throws Exception {
-			out.collect(values.iterator().next());
-		}
-	}
-
-	private static class SquareFlatMapFunction implements FlatMapFunction<Long, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Long value, Collector<Long> out) throws Exception {
-			for (long i = 0; i < value; i++) {
-				out.collect(value);
-			}
-		}
-	}
-
-	private static class Timestamp6 implements TimestampExtractor<Tuple2<Date, HashMap<Character, Integer>>> {
-
-		@Override
-		public long extractTimestamp(Tuple2<Date, HashMap<Character, Integer>> value,
-				long currentTimestamp) {
-			Calendar cal = Calendar.getInstance();
-			cal.setTime(value.f0);
-			return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH);
-		}
-
-		@Override
-		public long extractWatermark(Tuple2<Date, HashMap<Character, Integer>> value,
-				long currentTimestamp) {
-			Calendar cal = Calendar.getInstance();
-			cal.setTime(value.f0);
-			return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH) - 1;
-		}
-
-		@Override
-		public long getCurrentWatermark() {
-			return 0;
-		}
-	}
-
-	private static class SalesReduceFunction implements ReduceFunction<Tuple2<Date, HashMap<Character, Integer>>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Date, HashMap<Character, Integer>> reduce(Tuple2<Date, HashMap<Character, Integer>> value1,
-				Tuple2<Date,
-						HashMap<Character, Integer>> value2) throws Exception {
-			HashMap<Character, Integer> map1 = value1.f1;
-			HashMap<Character, Integer> map2 = value2.f1;
-			for (Character key : map2.keySet()) {
-				Integer volume1 = map1.get(key);
-				Integer volume2 = map2.get(key);
-				if (volume1 == null) {
-					volume1 = 0;
-				}
-				map1.put(key, volume1 + volume2);
-			}
-			return new Tuple2<>(value2.f0, map1);
-		}
-	}
-
-	private static class FlatMapFunction6 implements FlatMapFunction<Tuple2<Date, HashMap<Character, Integer>>, Tuple2<Integer,
-			Tuple2<Character, Integer>>> {
-
-		@Override
-		public void flatMap(Tuple2<Date, HashMap<Character, Integer>> value, Collector<Tuple2<Integer,
-				Tuple2<Character, Integer>>> out) throws Exception {
-			Calendar cal = Calendar.getInstance();
-			cal.setTime(value.f0);
-			for (Character key : value.f1.keySet()) {
-				out.collect(new Tuple2<>(cal.get(Calendar.MONTH)
-						+ 1,
-						new Tuple2<>(key, value.f1.get(key))));
-			}
-		}
-	}
-
-	private static class MapFunction6 implements MapFunction<Tuple2<Date, HashMap<Character, Integer>>, ArrayList<Character>> {
-
-		@Override
-		public ArrayList<Character> map(Tuple2<Date, HashMap<Character, Integer>> value)
-				throws Exception {
-			ArrayList<Character> list = new ArrayList<>();
-			for (Character ch : value.f1.keySet()) {
-				for (int i = 0; i < value.f1.get(ch); i++) {
-					list.add(ch);
-				}
-			}
-			Collections.sort(list);
-			return list;
-		}
-	}
-
-	// *************************************************************************
-	// DATA TYPES
-	// *************************************************************************
-
-	//Flink Pojo
-	public static class InnerPojo {
-		public Long f0;
-		public String f1;
-
-		//default constructor to qualify as Flink POJO
-		InnerPojo(){}
-
-		public InnerPojo(Long f0, String f1) {
-			this.f0 = f0;
-			this.f1 = f1;
-		}
-
-		@Override
-		public String toString() {
-			return "POJO(" + f0 + "," + f1 + ")";
-		}
-	}
-
-	// Nested class serialized with Kryo
-	public static class OuterPojo {
-		public InnerPojo f0;
-		public Long f1;
-
-		public OuterPojo(InnerPojo f0, Long f1) {
-			this.f0 = f0;
-			this.f1 = f1;
-		}
-
-		@Override
-		public String toString() {
-			return "POJO(" + f0 + "," + f1 + ")";
-		}
-	}
-
-	public static class Rectangle {
-
-		public int a;
-		public int b;
-
-		//default constructor to qualify as Flink POJO
-		public Rectangle() {}
-
-		public Rectangle(int a, int b) {
-			this.a = a;
-			this.b = b;
-		}
-
-		public Rectangle next() {
-			return new Rectangle(a + (b % 11), b + (a % 9));
-		}
-
-		@Override
-		public String toString() {
-			return "(" + a + "," + b + ")";
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
deleted file mode 100644
index 41bd381..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.source.FromElementsFunction}.
- */
-public class FromElementsFunctionTest {
-	
-	@Test
-	public void testStrings() {
-		try {
-			String[] data = { "Oh", "boy", "what", "a", "show", "!"};
-
-			FromElementsFunction<String> source = new FromElementsFunction<String>(
-					BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), data);
-			
-			List<String> result = new ArrayList<String>();
-			source.run(new ListSourceContext<String>(result));
-			
-			assertEquals(Arrays.asList(data), result);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testNonJavaSerializableType() {
-		try {
-			MyPojo[] data = { new MyPojo(1, 2), new MyPojo(3, 4), new MyPojo(5, 6) };
-
-			FromElementsFunction<MyPojo> source = new FromElementsFunction<MyPojo>(
-					TypeExtractor.getForClass(MyPojo.class).createSerializer(new ExecutionConfig()), data);
-
-			List<MyPojo> result = new ArrayList<MyPojo>();
-			source.run(new ListSourceContext<MyPojo>(result));
-
-			assertEquals(Arrays.asList(data), result);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSerializationError() {
-		try {
-			TypeInformation<SerializationErrorType> info = 
-					new ValueTypeInfo<SerializationErrorType>(SerializationErrorType.class);
-			
-			try {
-				new FromElementsFunction<SerializationErrorType>(
-					info.createSerializer(new ExecutionConfig()), new SerializationErrorType());
-				
-				fail("should fail with an exception");
-			}
-			catch (IOException e) {
-				assertTrue(ExceptionUtils.stringifyException(e).contains("test exception"));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testDeSerializationError() {
-		try {
-			TypeInformation<DeserializeTooMuchType> info =
-					new ValueTypeInfo<DeserializeTooMuchType>(DeserializeTooMuchType.class);
-
-			FromElementsFunction<DeserializeTooMuchType> source = new FromElementsFunction<DeserializeTooMuchType>(
-					info.createSerializer(new ExecutionConfig()), new DeserializeTooMuchType());
-			
-			try {
-				source.run(new ListSourceContext<DeserializeTooMuchType>(new ArrayList<DeserializeTooMuchType>()));
-				fail("should fail with an exception");
-			}
-			catch (IOException e) {
-				assertTrue(ExceptionUtils.stringifyException(e).contains("user-defined serialization"));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testCheckpointAndRestore() {
-		try {
-			final int NUM_ELEMENTS = 10000;
-			
-			List<Integer> data = new ArrayList<Integer>(NUM_ELEMENTS);
-			List<Integer> result = new ArrayList<Integer>(NUM_ELEMENTS);
-			
-			for (int i = 0; i < NUM_ELEMENTS; i++) {
-				data.add(i);
-			}
-			
-			final FromElementsFunction<Integer> source = new FromElementsFunction<Integer>(IntSerializer.INSTANCE, data);
-			final FromElementsFunction<Integer> sourceCopy = CommonTestUtils.createCopySerializable(source);
-			
-			final SourceFunction.SourceContext<Integer> ctx = new ListSourceContext<Integer>(result, 2L);
-			
-			final Throwable[] error = new Throwable[1];
-			
-			// run the source asynchronously
-			Thread runner = new Thread() {
-				@Override
-				public void run() {
-					try {
-						source.run(ctx);
-					}
-					catch (Throwable t) {
-						error[0] = t;
-					}
-				}
-			};
-			runner.start();
-			
-			// wait for a bit 
-			Thread.sleep(1000);
-			
-			// make a checkpoint
-			int count;
-			List<Integer> checkpointData = new ArrayList<Integer>(NUM_ELEMENTS);
-			
-			synchronized (ctx.getCheckpointLock()) {
-				count = source.snapshotState(566, System.currentTimeMillis());
-				checkpointData.addAll(result);
-			}
-			
-			// cancel the source
-			source.cancel();
-			runner.join();
-			
-			// check for errors
-			if (error[0] != null) {
-				System.err.println("Error in asynchronous source runner");
-				error[0].printStackTrace();
-				fail("Error in asynchronous source runner");
-			}
-			
-			// recovery run
-			SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<Integer>(checkpointData);
-			sourceCopy.restoreState(count);
-			
-			sourceCopy.run(newCtx);
-			
-			assertEquals(data, checkpointData);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	
-	// ------------------------------------------------------------------------
-	//  Test Types
-	// ------------------------------------------------------------------------
-	
-	public static class MyPojo {
-		
-		public long val1;
-		public int val2;
-
-		public MyPojo() {}
-		
-		public MyPojo(long val1, int val2) {
-			this.val1 = val1;
-			this.val2 = val2;
-		}
-
-		@Override
-		public int hashCode() {
-			return this.val2;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof MyPojo) {
-				MyPojo that = (MyPojo) obj;
-				return this.val1 == that.val1 && this.val2 == that.val2; 
-			}
-			else {
-				return false;
-			}
-		}
-	}
-	
-	public static class SerializationErrorType implements Value {
-
-		private static final long serialVersionUID = -6037206294939421807L;
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			throw new IOException("test exception");
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			throw new IOException("test exception");
-		}
-	}
-
-	public static class DeserializeTooMuchType implements Value {
-
-		private static final long serialVersionUID = -6037206294939421807L;
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeInt(42);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			in.readLong();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
deleted file mode 100644
index e4dadf0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.util.List;
-
-/**
- * Mock context that collects elements in a List.
- * 
- * @param <T> Type of the collected elements.
- */
-public class ListSourceContext<T> implements SourceFunction.SourceContext<T> {
-	
-	private final Object lock = new Object();
-	
-	private final List<T> target;
-
-	private final long delay;
-	
-	
-	public ListSourceContext(List<T> target) {
-		this(target, 0L);
-	}
-
-	public ListSourceContext(List<T> target, long delay) {
-		this.target = target;
-		this.delay = delay;
-	}
-
-	@Override
-	public void collect(T element) {
-		target.add(element);
-		
-		if (delay > 0) {
-			try {
-				Thread.sleep(delay);
-			}
-			catch (InterruptedException e) {
-				// ignore
-			}
-		}
-	}
-
-	@Override
-	public void collectWithTimestamp(T element, long timestamp) {
-		target.add(element);
-	}
-
-	@Override
-	public void emitWatermark(Watermark mark) {
-		// don't do anything
-	}
-
-	@Override
-	public Object getCheckpointLock() {
-		return lock;
-	}
-
-	@Override
-	public void close() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
deleted file mode 100644
index f7c6e53..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
- */
-public class PrintSinkFunctionTest {
-
-	public PrintStream printStreamOriginal = System.out;
-	private String line = System.lineSeparator();
-
-	@Test
-	public void testPrintSinkStdOut(){
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		PrintStream stream = new PrintStream(baos);
-		System.setOut(stream);
-
-		final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
-
-		PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
-		printSink.setRuntimeContext(ctx);
-		try {
-			printSink.open(new Configuration());
-		} catch (Exception e) {
-			Assert.fail();
-		}
-		printSink.setTargetToStandardOut();
-		printSink.invoke("hello world!");
-
-		assertEquals("Print to System.out", printSink.toString());
-		assertEquals("hello world!" + line, baos.toString());
-
-		printSink.close();
-		stream.close();
-	}
-
-	@Test
-	public void testPrintSinkStdErr(){
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		PrintStream stream = new PrintStream(baos);
-		System.setOut(stream);
-
-		final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
-
-		PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
-		printSink.setRuntimeContext(ctx);
-		try {
-			printSink.open(new Configuration());
-		} catch (Exception e) {
-			Assert.fail();
-		}
-		printSink.setTargetToStandardErr();
-		printSink.invoke("hello world!");
-
-		assertEquals("Print to System.err", printSink.toString());
-		assertEquals("hello world!" + line, baos.toString());
-
-		printSink.close();
-		stream.close();
-	}
-
-	@Test
-	public void testPrintSinkWithPrefix(){
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		PrintStream stream = new PrintStream(baos);
-		System.setOut(stream);
-
-		final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
-		Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
-		Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
-
-		PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
-		printSink.setRuntimeContext(ctx);
-		try {
-			printSink.open(new Configuration());
-		} catch (Exception e) {
-			Assert.fail();
-		}
-		printSink.setTargetToStandardErr();
-		printSink.invoke("hello world!");
-
-		assertEquals("Print to System.err", printSink.toString());
-		assertEquals("2> hello world!" + line, baos.toString());
-
-		printSink.close();
-		stream.close();
-	}
-
-	@After
-	public void restoreSystemOut() {
-		System.setOut(printStreamOriginal);
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
deleted file mode 100644
index 8f4acde..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
- */
-@SuppressWarnings("serial")
-public class SocketClientSinkTest extends TestLogger {
-
-	private static final String TEST_MESSAGE = "testSocketSinkInvoke";
-
-	private static final String EXCEPTION_MESSGAE = "Failed to send message '" + TEST_MESSAGE + "\n'";
-
-	private static final String host = "127.0.0.1";
-
-	private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
-		@Override
-		public byte[] serialize(String element) {
-			return element.getBytes();
-		}
-	};
-
-	@Test
-	public void testSocketSink() throws Exception {
-		final ServerSocket server = new ServerSocket(0);
-		final int port = server.getLocalPort();
-
-		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-		Thread sinkRunner = new Thread("Test sink runner") {
-			@Override
-			public void run() {
-				try {
-					SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0);
-					simpleSink.open(new Configuration());
-					simpleSink.invoke(TEST_MESSAGE + '\n');
-					simpleSink.close();
-				}
-				catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-
-		sinkRunner.start();
-
-		Socket sk = server.accept();
-		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
-
-		String value = rdr.readLine();
-
-		sinkRunner.join();
-		server.close();
-
-		if (error.get() != null) {
-			Throwable t = error.get();
-			t.printStackTrace();
-			fail("Error in spawned thread: " + t.getMessage());
-		}
-
-		assertEquals(TEST_MESSAGE, value);
-	}
-
-	@Test
-	public void testSinkAutoFlush() throws Exception {
-		final ServerSocket server = new ServerSocket(0);
-		final int port = server.getLocalPort();
-
-		final SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0, true);
-		simpleSink.open(new Configuration());
-
-		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-		Thread sinkRunner = new Thread("Test sink runner") {
-			@Override
-			public void run() {
-				try {
-					// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
-					simpleSink.invoke(TEST_MESSAGE + '\n');
-				}
-				catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-
-		sinkRunner.start();
-
-		Socket sk = server.accept();
-		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
-		String value = rdr.readLine();
-
-		sinkRunner.join();
-		simpleSink.close();
-		server.close();
-
-		if (error.get() != null) {
-			Throwable t = error.get();
-			t.printStackTrace();
-			fail("Error in spawned thread: " + t.getMessage());
-		}
-
-		assertEquals(TEST_MESSAGE, value);
-	}
-
-	@Test
-	public void testSocketSinkNoRetry() throws Exception {
-		final ServerSocket server = new ServerSocket(0);
-		final int port = server.getLocalPort();
-
-		try {
-			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-			Thread serverRunner = new Thread("Test server runner") {
-
-				@Override
-				public void run() {
-					try {
-						Socket sk = server.accept();
-						sk.close();
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			serverRunner.start();
-
-			SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0, true);
-			simpleSink.open(new Configuration());
-
-			// wait socket server to close
-			serverRunner.join();
-			if (error.get() != null) {
-				Throwable t = error.get();
-				t.printStackTrace();
-				fail("Error in server thread: " + t.getMessage());
-			}
-
-			try {
-				// socket should be closed, so this should trigger a re-try
-				// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
-				while (true) { // we have to do this more often as the server side closed is not guaranteed to be noticed immediately
-					simpleSink.invoke(TEST_MESSAGE + '\n');
-				}
-			}
-			catch (IOException e) {
-				// check whether throw a exception that reconnect failed.
-				assertTrue("Wrong exception", e.getMessage().contains(EXCEPTION_MESSGAE));
-			}
-			catch (Exception e) {
-				fail("wrong exception: " + e.getClass().getName() + " - " + e.getMessage());
-			}
-
-			assertEquals(0, simpleSink.getCurrentNumberOfRetries());
-		}
-		finally {
-			IOUtils.closeQuietly(server);
-		}
-	}
-
-	@Test
-	public void testRetry() throws Exception {
-
-		final ServerSocket[] serverSocket = new ServerSocket[1];
-		final ExecutorService[] executor = new ExecutorService[1];
-
-		try {
-			serverSocket[0] = new ServerSocket(0);
-			executor[0] = Executors.newCachedThreadPool();
-
-			int port = serverSocket[0].getLocalPort();
-
-			Callable<Void> serverTask = new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					Socket socket = serverSocket[0].accept();
-
-					BufferedReader reader = new BufferedReader(new InputStreamReader(
-							socket.getInputStream()));
-
-					String value = reader.readLine();
-					assertEquals("0", value);
-
-					socket.close();
-					return null;
-				}
-			};
-
-			Future<Void> serverFuture = executor[0].submit(serverTask);
-
-			final SocketClientSink<String> sink = new SocketClientSink<>(
-					host, serverSocket[0].getLocalPort(), simpleSchema, -1, true);
-
-			// Create the connection
-			sink.open(new Configuration());
-
-			// Initial payload => this will be received by the server an then the socket will be
-			// closed.
-			sink.invoke("0\n");
-
-			// Get future an make sure there was no problem. This will rethrow any Exceptions from
-			// the server.
-			serverFuture.get();
-
-			// Shutdown the server socket
-			serverSocket[0].close();
-			assertTrue(serverSocket[0].isClosed());
-
-			// No retries expected at this point
-			assertEquals(0, sink.getCurrentNumberOfRetries());
-
-			final CountDownLatch retryLatch = new CountDownLatch(1);
-			final CountDownLatch again = new CountDownLatch(1);
-
-			Callable<Void> sinkTask = new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					// Send next payload => server is down, should try to reconnect.
-
-					// We need to send more than just one packet to notice the closed connection.
-					while (retryLatch.getCount() != 0) {
-						sink.invoke("1\n");
-					}
-
-					return null;
-				}
-			};
-
-			Future<Void> sinkFuture = executor[0].submit(sinkTask);
-
-			while (sink.getCurrentNumberOfRetries() == 0) {
-				// Wait for a retry
-				Thread.sleep(100);
-			}
-
-			// OK the poor guy retried to write
-			retryLatch.countDown();
-
-			// Restart the server
-			serverSocket[0] = new ServerSocket(port);
-			Socket socket = serverSocket[0].accept();
-
-			BufferedReader reader = new BufferedReader(new InputStreamReader(
-					socket.getInputStream()));
-
-			// Wait for the reconnect
-			String value = reader.readLine();
-
-			assertEquals("1", value);
-
-			// OK the sink re-connected. :)
-		}
-		finally {
-			if (serverSocket[0] != null) {
-				serverSocket[0].close();
-			}
-
-			if (executor[0] != null) {
-				executor[0].shutdown();
-			}
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
deleted file mode 100644
index 2d9921a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *	 http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Test;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction}.
- */
-public class FileMonitoringFunctionTest {
-
-	@Test
-	public void testForEmptyLocation() throws Exception {
-		final FileMonitoringFunction fileMonitoringFunction
-			= new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
-
-        new Thread() {
-            @Override
-            public void run() {
-                try {
-                    Thread.sleep(1000L);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                fileMonitoringFunction.cancel();
-            }
-        }.start();
-
-		fileMonitoringFunction.run(
-            new StreamSource.NonWatermarkContext<Tuple3<String, Long, Long>>(
-                new Object(),
-                new Output<StreamRecord<Tuple3<String, Long, Long>>>() {
-                    @Override
-                    public void emitWatermark(Watermark mark) { }
-                    @Override
-                    public void collect(StreamRecord<Tuple3<String, Long, Long>> record) { }
-                    @Override
-                    public void close() { }
-                })
-        );
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
deleted file mode 100644
index 3398451..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.commons.io.IOUtils;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import org.junit.Test;
-
-import java.io.EOFException;
-import java.io.OutputStreamWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
- */
-public class SocketTextStreamFunctionTest {
-
-	private static final String LOCALHOST = "127.0.0.1";
-
-
-	@Test
-	public void testSocketSourceSimpleOutput() throws Exception {
-		ServerSocket server = new ServerSocket(0);
-		Socket channel = null;
-		
-		try {
-			SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 0);
-	
-			SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
-			runner.start();
-	
-			channel = server.accept();
-			OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
-			
-			writer.write("test1\n");
-			writer.write("check\n");
-			writer.flush();
-			runner.waitForNumElements(2);
-
-			runner.cancel();
-			runner.interrupt();
-			
-			runner.waitUntilDone();
-			
-			channel.close();
-		}
-		finally {
-			if (channel != null) {
-				IOUtils.closeQuietly(channel);
-			}
-			IOUtils.closeQuietly(server);
-		}
-	}
-
-	@Test
-	public void testExitNoRetries() throws Exception {
-		ServerSocket server = new ServerSocket(0);
-		Socket channel = null;
-
-		try {
-			SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 0);
-
-			SocketSourceThread runner = new SocketSourceThread(source);
-			runner.start();
-
-			channel = server.accept();
-			channel.close();
-			
-			try {
-				runner.waitUntilDone();
-			}
-			catch (Exception e) {
-				assertTrue(e.getCause() instanceof EOFException);
-			}
-		}
-		finally {
-			if (channel != null) {
-				IOUtils.closeQuietly(channel);
-			}
-			IOUtils.closeQuietly(server);
-		}
-	}
-
-	@Test
-	public void testSocketSourceOutputWithRetries() throws Exception {
-		ServerSocket server = new ServerSocket(0);
-		Socket channel = null;
-
-		try {
-			SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 10, 100);
-
-			SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
-			runner.start();
-
-			// first connection: nothing
-			channel = server.accept();
-			channel.close();
-
-			// second connection: first string
-			channel = server.accept();
-			OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
-			writer.write("test1\n");
-			writer.close();
-			channel.close();
-
-			// third connection: nothing
-			channel = server.accept();
-			channel.close();
-
-			// forth connection: second string
-			channel = server.accept();
-			writer = new OutputStreamWriter(channel.getOutputStream());
-			writer.write("check\n");
-			writer.flush();
-
-			runner.waitForNumElements(2);
-			runner.cancel();
-			runner.waitUntilDone();
-		}
-		finally {
-			if (channel != null) {
-				IOUtils.closeQuietly(channel);
-			}
-			IOUtils.closeQuietly(server);
-		}
-	}
-
-	@Test
-	public void testSocketSourceOutputInfiniteRetries() throws Exception {
-		ServerSocket server = new ServerSocket(0);
-		Socket channel = null;
-
-		try {
-			SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', -1, 100);
-
-			SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
-			runner.start();
-
-			// first connection: nothing
-			channel = server.accept();
-			channel.close();
-
-			// second connection: first string
-			channel = server.accept();
-			OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
-			writer.write("test1\n");
-			writer.close();
-			channel.close();
-
-			// third connection: nothing
-			channel = server.accept();
-			channel.close();
-
-			// forth connection: second string
-			channel = server.accept();
-			writer = new OutputStreamWriter(channel.getOutputStream());
-			writer.write("check\n");
-			writer.flush();
-
-			runner.waitForNumElements(2);
-			runner.cancel();
-			runner.waitUntilDone();
-		}
-		finally {
-			if (channel != null) {
-				IOUtils.closeQuietly(channel);
-			}
-			IOUtils.closeQuietly(server);
-		}
-	}
-
-	@Test
-	public void testSocketSourceOutputAcrossRetries() throws Exception {
-		ServerSocket server = new ServerSocket(0);
-		Socket channel = null;
-
-		try {
-			SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), '\n', 10, 100);
-
-			SocketSourceThread runner = new SocketSourceThread(source, "test1", "check1", "check2");
-			runner.start();
-
-			// first connection: nothing
-			channel = server.accept();
-			channel.close();
-
-			// second connection: first string
-			channel = server.accept();
-			OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
-			writer.write("te");
-			writer.close();
-			channel.close();
-
-			// third connection: nothing
-			channel = server.accept();
-			channel.close();
-
-			// forth connection: second string
-			channel = server.accept();
-			writer = new OutputStreamWriter(channel.getOutputStream());
-			writer.write("st1\n");
-			writer.write("check1\n");
-			writer.write("check2\n");
-			writer.flush();
-
-			runner.waitForNumElements(2);
-			runner.cancel();
-			runner.waitUntilDone();
-		}
-		finally {
-			if (channel != null) {
-				IOUtils.closeQuietly(channel);
-			}
-			IOUtils.closeQuietly(server);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-
-	private static class SocketSourceThread extends Thread {
-		
-		private final Object sync = new Object();
-		
-		private final SocketTextStreamFunction socketSource;
-		
-		private final String[] expectedData;
-		
-		private volatile Throwable error;
-		private volatile int numElementsReceived;
-		private volatile boolean canceled;
-		private volatile boolean done;
-		
-		public SocketSourceThread(SocketTextStreamFunction socketSource, String... expectedData) {
-			this.socketSource = socketSource;
-			this.expectedData = expectedData;
-		}
-
-		public void run() {
-			try {
-				SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
-					
-					private final Object lock = new Object();
-					
-					@Override
-					public void collect(String element) {
-						int pos = numElementsReceived;
-						
-						// make sure waiter know of us
-						synchronized (sync) {
-							numElementsReceived++;
-							sync.notifyAll();
-						}
-						
-						if (expectedData != null && expectedData.length > pos) {
-							assertEquals(expectedData[pos], element);
-						}
-					}
-
-					@Override
-					public void collectWithTimestamp(String element, long timestamp) {
-						collect(element);
-					}
-
-					@Override
-					public void emitWatermark(Watermark mark) {}
-
-					@Override
-					public Object getCheckpointLock() {
-						return lock;
-					}
-
-					@Override
-					public void close() {}
-				};
-				
-				socketSource.run(ctx);
-			}
-			catch (Throwable t) {
-				synchronized (sync) {
-					if (!canceled) {
-						error = t;
-					}
-					sync.notifyAll();
-				}
-			}
-			finally {
-				synchronized (sync) {
-					done = true;
-					sync.notifyAll();
-				}
-			}
-		}
-		
-		public void cancel() {
-			synchronized (sync) {
-				canceled = true;
-				socketSource.cancel();
-				interrupt();
-			}
-		}
-
-		public void waitForNumElements(int numElements) throws InterruptedException {
-			synchronized (sync) {
-				while (error == null && !canceled && !done && numElementsReceived < numElements) {
-					sync.wait();
-				}
-
-				if (error != null) {
-					throw new RuntimeException("Error in source thread", error);
-				}
-				if (canceled) {
-					throw new RuntimeException("canceled");
-				}
-				if (done) {
-					throw new RuntimeException("Exited cleanly before expected number of elements");
-				}
-			}
-		}
-
-		public void waitUntilDone() throws InterruptedException {
-			join();
-
-			if (error != null) {
-				throw new RuntimeException("Error in source thread", error);
-			}
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
deleted file mode 100644
index c98a659..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.ArrayFromTuple;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ArrayFromTupleTest {
-
-	private String[] testStrings;
-
-	@Before
-	public void init() {
-		testStrings = new String[Tuple.MAX_ARITY];
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			testStrings[i] = Integer.toString(i);
-		}
-	}
-
-	@Test
-	public void testConvertFromTupleToArray() throws InstantiationException, IllegalAccessException {
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			Tuple currentTuple = (Tuple) CLASSES[i].newInstance();
-			String[] currentArray = new String[i + 1];
-			for (int j = 0; j <= i; j++) {
-				currentTuple.setField(testStrings[j], j);
-				currentArray[j] = testStrings[j];
-			}
-			arrayEqualityCheck(currentArray, new ArrayFromTuple().extract(currentTuple));
-		}
-	}
-
-	@Test
-	public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
-		Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
-		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
-			currentTuple.setField(testStrings[i], i);
-		}
-
-		String[] expected = { testStrings[5], testStrings[3], testStrings[6], testStrings[7],
-				testStrings[0] };
-		arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
-
-		String[] expected2 = { testStrings[0], testStrings[Tuple.MAX_ARITY - 1] };
-		arrayEqualityCheck(expected2,
-				new ArrayFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
-
-		String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], testStrings[0] };
-		arrayEqualityCheck(expected3,
-				new ArrayFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
-
-		String[] expected4 = { testStrings[13], testStrings[4], testStrings[5], testStrings[4],
-				testStrings[2], testStrings[8], testStrings[6], testStrings[2], testStrings[8],
-				testStrings[3], testStrings[5], testStrings[2], testStrings[16], testStrings[4],
-				testStrings[3], testStrings[2], testStrings[6], testStrings[4], testStrings[7],
-				testStrings[4], testStrings[2], testStrings[8], testStrings[7], testStrings[2] };
-		arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
-				4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
-	}
-
-	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
-		assertEquals("The result arrays must have the same length", array1.length, array2.length);
-		for (int i = 0; i < array1.length; i++) {
-			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
-		}
-	}
-
-	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
-			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
-			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
-			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
-			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
-			Tuple24.class, Tuple25.class };
-}