You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:41 UTC

[25/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
deleted file mode 100644
index 4dbf7b8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link CoStreamFlatMap}. These test that:
- *
- * <ul>
- *     <li>RichFunction methods are called correctly</li>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class CoStreamFlatMapTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(String value, Collector<String> coll) {
-			for (int i = 0; i < value.length(); i++) {
-				coll.collect(value.substring(i, i + 1));
-			}
-		}
-
-		@Override
-		public void flatMap2(Integer value, Collector<String> coll) {
-			coll.collect(value.toString());
-		}
-	}
-
-	@Test
-	public void testCoFlatMap() throws Exception {
-		CoStreamFlatMap<String, Integer, String> operator = new CoStreamFlatMap<String, Integer, String>(new MyCoFlatMap());
-
-		TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<String>("abc", initialTime + 1));
-		testHarness.processElement1(new StreamRecord<String>("def", initialTime + 2));
-		testHarness.processWatermark1(new Watermark(initialTime + 2));
-		testHarness.processElement1(new StreamRecord<String>("ghi", initialTime + 3));
-
-		testHarness.processElement2(new StreamRecord<Integer>(1, initialTime + 1));
-		testHarness.processElement2(new StreamRecord<Integer>(2, initialTime + 2));
-		testHarness.processWatermark2(new Watermark(initialTime + 3));
-		testHarness.processElement2(new StreamRecord<Integer>(3, initialTime + 3));
-		testHarness.processElement2(new StreamRecord<Integer>(4, initialTime + 4));
-		testHarness.processElement2(new StreamRecord<Integer>(5, initialTime + 5));
-
-		expectedOutput.add(new StreamRecord<String>("a", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("b", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("c", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("d", initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("e", initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("f", initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("g", initialTime + 3));
-		expectedOutput.add(new StreamRecord<String>("h", initialTime + 3));
-		expectedOutput.add(new StreamRecord<String>("i", initialTime + 3));
-
-		expectedOutput.add(new StreamRecord<String>("1", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("2", initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("3", initialTime + 3));
-		expectedOutput.add(new StreamRecord<String>("4", initialTime + 4));
-		expectedOutput.add(new StreamRecord<String>("5", initialTime + 5));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	@Test
-	public void testOpenClose() throws Exception {
-		CoStreamFlatMap<String, Integer, String> operator = new CoStreamFlatMap<String, Integer, String>(new TestOpenCloseCoFlatMapFunction());
-
-		TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator);
-
-		long initialTime = 0L;
-
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<String>("Hello", initialTime));
-		testHarness.processElement2(new StreamRecord<Integer>(42, initialTime));
-
-		testHarness.close();
-
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoFlatMapFunction.closeCalled);
-		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseCoFlatMapFunction extends RichCoFlatMapFunction<String, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public void flatMap1(String value, Collector<String> out) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			out.collect(value);
-		}
-
-		@Override
-		public void flatMap2(Integer value, Collector<String> out) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			out.collect(value.toString());
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
deleted file mode 100644
index 28ae664..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Tests for {@link org.apache.flink.streaming.api.operators.co.CoStreamMap}. These test that:
- *
- * <ul>
- *     <li>RichFunction methods are called correctly</li>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class CoStreamMapTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map1(Double value) {
-			return value.toString();
-		}
-
-		@Override
-		public String map2(Integer value) {
-			return value.toString();
-		}
-	}
-
-
-	@Test
-	public void testCoMap() throws Exception {
-		CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new MyCoMap());
-
-		TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<Double>(1.1d, initialTime + 1));
-		testHarness.processElement1(new StreamRecord<Double>(1.2d, initialTime + 2));
-		testHarness.processElement1(new StreamRecord<Double>(1.3d, initialTime + 3));
-		testHarness.processWatermark1(new Watermark(initialTime + 3));
-		testHarness.processElement1(new StreamRecord<Double>(1.4d, initialTime + 4));
-		testHarness.processElement1(new StreamRecord<Double>(1.5d, initialTime + 5));
-
-		testHarness.processElement2(new StreamRecord<Integer>(1, initialTime + 1));
-		testHarness.processElement2(new StreamRecord<Integer>(2, initialTime + 2));
-		testHarness.processWatermark2(new Watermark(initialTime + 2));
-		testHarness.processElement2(new StreamRecord<Integer>(3, initialTime + 3));
-		testHarness.processElement2(new StreamRecord<Integer>(4, initialTime + 4));
-		testHarness.processElement2(new StreamRecord<Integer>(5, initialTime + 5));
-
-		expectedOutput.add(new StreamRecord<String>("1.1", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("1.2", initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("1.3", initialTime + 3));
-		expectedOutput.add(new StreamRecord<String>("1.4", initialTime + 4));
-		expectedOutput.add(new StreamRecord<String>("1.5", initialTime + 5));
-
-		expectedOutput.add(new StreamRecord<String>("1", initialTime + 1));
-		expectedOutput.add(new StreamRecord<String>("2", initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<String>("3", initialTime + 3));
-		expectedOutput.add(new StreamRecord<String>("4", initialTime + 4));
-		expectedOutput.add(new StreamRecord<String>("5", initialTime + 5));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	@Test
-	public void testOpenClose() throws Exception {
-		CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new TestOpenCloseCoMapFunction());
-
-		TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator);
-
-		long initialTime = 0L;
-
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<Double>(74d, initialTime));
-		testHarness.processElement2(new StreamRecord<Integer>(42, initialTime));
-
-		testHarness.close();
-
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoMapFunction.closeCalled);
-		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseCoMapFunction extends RichCoMapFunction<Double, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public String map1(Double value) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			return value.toString();
-		}
-
-		@Override
-		public String map2(Integer value) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			return value.toString();
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
deleted file mode 100644
index 130842e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.api.operators.co;
-//
-//import static org.junit.Assert.assertEquals;
-//
-//import java.util.ArrayList;
-//import java.util.HashSet;
-//import java.util.List;
-//import java.util.Set;
-//
-//import org.apache.flink.api.java.tuple.Tuple2;
-//import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
-//import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
-//import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-//import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-//import org.apache.flink.streaming.util.MockCoContext;
-//import org.apache.flink.util.Collector;
-//import org.junit.Test;
-//
-//public class CoWindowTest {
-//
-//	public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> {
-//
-//		private static final long serialVersionUID = 1L;
-//
-//		@SuppressWarnings("unused")
-//		@Override
-//		public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out)
-//				throws Exception {
-//			Integer count1 = 0;
-//			for (Integer i : first) {
-//				count1++;
-//			}
-//			Integer count2 = 0;
-//			for (Integer i : second) {
-//				count2++;
-//			}
-//			out.collect(count1);
-//			out.collect(count2);
-//
-//		}
-//
-//	}
-//
-//	public static final class MyCoGroup2 implements
-//			CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> {
-//
-//		private static final long serialVersionUID = 1L;
-//
-//		@Override
-//		public void coWindow(List<Tuple2<Integer, Integer>> first,
-//				List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception {
-//
-//			Set<Integer> firstElements = new HashSet<Integer>();
-//			for (Tuple2<Integer, Integer> value : first) {
-//				firstElements.add(value.f1);
-//			}
-//			for (Tuple2<Integer, Integer> value : second) {
-//				if (firstElements.contains(value.f1)) {
-//					out.collect(value.f1);
-//				}
-//			}
-//
-//		}
-//
-//	}
-//
-//	private static final class MyTS1 implements Timestamp<Integer> {
-//
-//		private static final long serialVersionUID = 1L;
-//
-//		@Override
-//		public long getTimestamp(Integer value) {
-//			return value;
-//		}
-//
-//	}
-//
-//	private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
-//
-//		private static final long serialVersionUID = 1L;
-//
-//		@Override
-//		public long getTimestamp(Tuple2<Integer, Integer> value) {
-//			return value.f0;
-//		}
-//
-//	}
-//
-//	@Test
-//	public void coWindowGroupReduceTest2() throws Exception {
-//
-//		CoStreamWindow<Integer, Integer, Integer> invokable1 = new CoStreamWindow<Integer, Integer, Integer>(
-//				new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
-//				new TimestampWrapper<Integer>(new MyTS1(), 1));
-//
-//		// Windowsize 2, slide 1
-//		// 1,2|2,3|3,4|4,5
-//
-//		List<Integer> input11 = new ArrayList<Integer>();
-//		input11.add(1);
-//		input11.add(1);
-//		input11.add(2);
-//		input11.add(3);
-//		input11.add(3);
-//
-//		List<Integer> input12 = new ArrayList<Integer>();
-//		input12.add(1);
-//		input12.add(2);
-//		input12.add(3);
-//		input12.add(3);
-//		input12.add(5);
-//
-//		// Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
-//		// expected output: 3,2|3,3|2,2|0,1
-//
-//		List<Integer> expected1 = new ArrayList<Integer>();
-//		expected1.add(3);
-//		expected1.add(2);
-//		expected1.add(3);
-//		expected1.add(3);
-//		expected1.add(2);
-//		expected1.add(2);
-//		expected1.add(0);
-//		expected1.add(1);
-//
-//		List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
-//		assertEquals(expected1, actual1);
-//
-//		CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
-//				new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
-//						1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
-//
-//		// WindowSize 2, slide 3
-//		// 1,2|4,5|7,8|
-//
-//		List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>();
-//		input21.add(new Tuple2<Integer, Integer>(1, 1));
-//		input21.add(new Tuple2<Integer, Integer>(1, 2));
-//		input21.add(new Tuple2<Integer, Integer>(2, 3));
-//		input21.add(new Tuple2<Integer, Integer>(3, 4));
-//		input21.add(new Tuple2<Integer, Integer>(3, 5));
-//		input21.add(new Tuple2<Integer, Integer>(4, 6));
-//		input21.add(new Tuple2<Integer, Integer>(4, 7));
-//		input21.add(new Tuple2<Integer, Integer>(5, 8));
-//
-//		List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>();
-//		input22.add(new Tuple2<Integer, Integer>(1, 1));
-//		input22.add(new Tuple2<Integer, Integer>(2, 0));
-//		input22.add(new Tuple2<Integer, Integer>(2, 2));
-//		input22.add(new Tuple2<Integer, Integer>(3, 9));
-//		input22.add(new Tuple2<Integer, Integer>(3, 4));
-//		input22.add(new Tuple2<Integer, Integer>(4, 10));
-//		input22.add(new Tuple2<Integer, Integer>(5, 8));
-//		input22.add(new Tuple2<Integer, Integer>(5, 7));
-//
-//		List<Integer> expected2 = new ArrayList<Integer>();
-//		expected2.add(1);
-//		expected2.add(2);
-//		expected2.add(8);
-//		expected2.add(7);
-//
-//		List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
-//		assertEquals(expected2, actual2);
-//	}
-//}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
deleted file mode 100644
index d00dc67..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class SelfConnectionTest extends StreamingMultipleProgramsTestBase {
-
-	private static List<String> expected;
-
-	/**
-	 * We connect two different data streams in a chain to a CoMap.
-	 */
-	@Test
-	public void differentDataStreamSameChain() {
-
-		TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataStream<Integer> src = env.fromElements(1, 3, 5);
-
-		DataStream<String> stringMap = src.map(new MapFunction<Integer, String>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String map(Integer value) throws Exception {
-				return "x " + value;
-			}
-		});
-
-		stringMap.connect(src).map(new CoMapFunction<String, Integer, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String map1(String value) {
-				return value;
-			}
-
-			@Override
-			public String map2(Integer value) {
-				return String.valueOf(value + 1);
-			}
-		}).addSink(resultSink);
-
-		try {
-			env.execute();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
-		expected = new ArrayList<String>();
-
-		expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
-
-		List<String> result = resultSink.getResult();
-
-		Collections.sort(expected);
-		Collections.sort(result);
-
-		assertEquals(expected, result);
-	}
-
-	/**
-	 * We connect two different data streams in different chains to a CoMap.
-	 * (This is not actually self-connect.)
-	 */
-	@Test
-	public void differentDataStreamDifferentChain() {
-
-		TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		DataStream<Integer> src = env.fromElements(1, 3, 5).disableChaining();
-
-		DataStream<String> stringMap = src.flatMap(new FlatMapFunction<Integer, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void flatMap(Integer value, Collector<String> out) throws Exception {
-				out.collect("x " + value);
-			}
-		}).keyBy(new KeySelector<String, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(String value) throws Exception {
-				return value.length();
-			}
-		});
-
-		DataStream<Long> longMap = src.map(new MapFunction<Integer, Long>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Long map(Integer value) throws Exception {
-				return Long.valueOf(value + 1);
-			}
-		}).keyBy(new KeySelector<Long, Long>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Long getKey(Long value) throws Exception {
-				return value;
-			}
-		});
-
-
-		stringMap.connect(longMap).map(new CoMapFunction<String, Long, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String map1(String value) {
-				return value;
-			}
-
-			@Override
-			public String map2(Long value) {
-				return value.toString();
-			}
-		}).addSink(resultSink);
-
-		try {
-			env.execute();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
-		expected = new ArrayList<String>();
-
-		expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
-
-		List<String> result = resultSink.getResult();
-
-		Collections.sort(expected);
-		Collections.sort(result);
-
-		assertEquals(expected, result);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
deleted file mode 100644
index 5377e09..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.outputformat;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
-
-public class CsvOutputFormatITCase extends StreamingProgramTestBase {
-
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> text = env.fromElements(WordCountData.TEXT);
-
-		DataStream<Tuple2<String, Integer>> counts = text
-				.flatMap(new Tokenizer())
-				.keyBy(0).sum(1);
-
-		counts.writeAsCsv(resultPath);
-
-		env.execute("WriteAsCsvTest");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		//Strip the parentheses from the expected text like output
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
-				.replaceAll("[\\\\(\\\\)]", ""), resultPath);
-	}
-
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
deleted file mode 100644
index 49876ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.outputformat;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.SocketOutputTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.junit.Ignore;
-
-@Ignore
-//This test sometimes failes most likely due to the behaviour
-//of the socket. Disabled for now.
-public class SocketOutputFormatITCase extends SocketOutputTestBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> text = env.fromElements(WordCountData.TEXT);
-
-		DataStream<String> counts =
-				text.flatMap(new CsvOutputFormatITCase.Tokenizer())
-						.keyBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
-					@Override
-					public String map(Tuple2<String, Integer> value) throws Exception {
-						return value.toString() + "\n";
-					}
-				});
-		counts.writeToSocket(HOST, port, new DummyStringSchema());
-
-		env.execute("WriteToSocketTest");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
deleted file mode 100644
index 380f00d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.outputformat;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-public class TextOutputFormatITCase extends StreamingProgramTestBase {
-
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> text = env.fromElements(WordCountData.TEXT);
-
-		DataStream<Tuple2<String, Integer>> counts = text
-				.flatMap(new CsvOutputFormatITCase.Tokenizer())
-				.keyBy(0).sum(1);
-
-		counts.writeAsText(resultPath);
-
-		env.execute("WriteAsTextTest");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
deleted file mode 100644
index 317a21c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamtask;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.operators.DataSourceTask;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-
-public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
-
-	public ArrayList<Integer> emittedRecords;
-
-	public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
-		super(inputBase.getEnvironment().getWriter(0));
-	}
-
-	public boolean initList() {
-		emittedRecords = new ArrayList<Integer>();
-		return true;
-	}
-	
-	@Override
-	public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
-		emittedRecords.add(record.getInstance().getValue().f0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
deleted file mode 100644
index 8f5f8df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamIterationHeadTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamtask;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ ResultPartitionWriter.class })
-public class StreamIterationHeadTest {
-
-	@Test
-	public void testIterationHeadWatermarkEmission() throws Exception {
-		StreamIterationHead<Integer> head = new StreamIterationHead<>();
-		StreamTaskTestHarness<Integer> harness = new StreamTaskTestHarness<>(head,
-				BasicTypeInfo.INT_TYPE_INFO);
-		harness.getStreamConfig().setIterationId("1");
-		harness.getStreamConfig().setIterationWaitTime(1);
-
-		harness.invoke();
-		harness.waitForTaskCompletion();
-
-		assertEquals(1, harness.getOutput().size());
-		assertEquals(new Watermark(Long.MAX_VALUE), harness.getOutput().peek());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
deleted file mode 100644
index 122aa8a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamtask;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-public class StreamVertexTest extends StreamingMultipleProgramsTestBase {
-
-	private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
-
-	public static class MySource implements SourceFunction<Tuple1<Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
-
-		private int i = 0;
-
-		@Override
-		public void run(SourceContext<Tuple1<Integer>> ctx) throws Exception {
-			for (int i = 0; i < 10; i++) {
-				tuple.f0 = i;
-				ctx.collect(tuple);
-			}
-		}
-
-		@Override
-		public void cancel() {
-		}
-	}
-
-	public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
-			Integer i = value.f0;
-			return new Tuple2<Integer, Integer>(i, i + 1);
-		}
-	}
-
-	public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Tuple2<Integer, Integer> tuple) {
-			Integer k = tuple.getField(0);
-			Integer v = tuple.getField(1);
-			data.put(k, v);
-		}
-
-	}
-
-	@SuppressWarnings("unused")
-	private static final int SOURCE_PARALELISM = 1;
-
-	@Test
-	public void wrongJobGraph() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(SOURCE_PARALELISM);
-
-		try {
-			env.fromCollection(null);
-			fail();
-		} catch (NullPointerException e) {
-		}
-
-		try {
-			env.fromElements();
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-
-		try {
-			env.generateSequence(-10, -30);
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-
-		try {
-			env.setBufferTimeout(-10);
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-
-		try {
-			env.generateSequence(1, 10).project(2);
-			fail();
-		} catch (RuntimeException e) {
-		}
-	}
-
-	private static class CoMap implements CoMapFunction<String, Long, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map1(String value) {
-			// System.out.println(value);
-			return value;
-		}
-
-		@Override
-		public String map2(Long value) {
-			// System.out.println(value);
-			return value.toString();
-		}
-	}
-
-	private static class SetSink implements SinkFunction<String> {
-		private static final long serialVersionUID = 1L;
-		public static Set<String> result = Collections.synchronizedSet(new HashSet<String>());
-
-		@Override
-		public void invoke(String value) {
-			result.add(value);
-		}
-
-	}
-
-	@Test
-	public void coTest() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(SOURCE_PARALELISM);
-
-		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
-		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
-
-		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
-
-		env.execute();
-
-		HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
-				"2", "3"));
-		assertEquals(expectedSet, SetSink.result);
-	}
-
-	@Test
-	public void runStream() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(SOURCE_PARALELISM);
-
-		env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask())
-				.addSink(new MySink());
-
-		env.execute();
-		assertEquals(10, data.keySet().size());
-
-		for (Integer k : data.keySet()) {
-			assertEquals((Integer) (k + 1), data.get(k));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
deleted file mode 100644
index bdc7e94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.CosineDistance;
-import org.junit.Test;
-
-public class CosineDistanceTest {
-	
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testCosineDistance() {
-		
-		//Reference calculated using wolfram alpha
-		double[][][] testdata={
-				{{0,0,0},{0,0,0}},
-				{{0,0,0},{1,2,3}},
-				{{1,2,3},{0,0,0}},
-				{{1,2,3},{4,5,6}},
-				{{1,2,3},{-4,-5,-6}},
-				{{1,2,-3},{-4,5,-6}},
-				{{1,2,3,4},{5,6,7,8}},
-				{{1,2},{3,4}},
-				{{1},{2}},
-			};
-		double[] referenceSolutions={
-				0,
-				0,
-				0,
-				0.025368,
-				1.974631,
-				0.269026,
-				0.031136,
-				0.016130,
-				0
-		};
-		
-		for (int i = 0; i < testdata.length; i++) {
-			assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and "
-					+ arrayToString(testdata[i][0]), referenceSolutions[i],
-					new CosineDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
-		}
-	}
-	
-	private String arrayToString(double[] in){
-		if (in.length==0) return "{}";
-		String result="{";
-		for (double d:in){
-			result+=d+",";
-		}
-		return result.substring(0, result.length()-1)+"}";
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
deleted file mode 100644
index 85a0882..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.EuclideanDistance;
-import org.junit.Test;
-
-public class EuclideanDistanceTest {
-	
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testEuclideanDistance() {
-		
-		//Reference calculated using wolfram alpha
-		double[][][] testdata={
-				{{0,0,0},{0,0,0}},
-				{{0,0,0},{1,2,3}},
-				{{1,2,3},{0,0,0}},
-				{{1,2,3},{4,5,6}},
-				{{1,2,3},{-4,-5,-6}},
-				{{1,2,-3},{-4,5,-6}},
-				{{1,2,3,4},{5,6,7,8}},
-				{{1,2},{3,4}},
-				{{1},{2}},
-			};
-		double[] referenceSolutions={
-				0,
-				3.741657,
-				3.741657,
-				5.196152,
-				12.4499,
-				6.557439,
-				8.0,
-				2.828427,
-				1
-		};
-		
-		for (int i = 0; i < testdata.length; i++) {
-			assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and "
-					+ arrayToString(testdata[i][0]), referenceSolutions[i],
-					new EuclideanDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
-		}
-		
-	}
-	
-	private String arrayToString(double[] in){
-		if (in.length==0) return "{}";
-		String result="{";
-		for (double d:in){
-			result+=d+",";
-		}
-		return result.substring(0, result.length()-1)+"}";
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
deleted file mode 100644
index 9d9d47b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.graph;
-
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class TranslationTest extends StreamingMultipleProgramsTestBase {
-	
-	@Test
-	public void testCheckpointModeTranslation() {
-		try {
-			// with deactivated fault tolerance, the checkpoint mode should be at-least-once
-			StreamExecutionEnvironment deactivated = getSimpleJob();
-			
-			for (JobVertex vertex : deactivated.getStreamGraph().getJobGraph().getVertices()) {
-				assertEquals(CheckpointingMode.AT_LEAST_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
-			}
-
-			// with activated fault tolerance, the checkpoint mode should be by default exactly once
-			StreamExecutionEnvironment activated = getSimpleJob();
-			activated.enableCheckpointing(1000L);
-			for (JobVertex vertex : activated.getStreamGraph().getJobGraph().getVertices()) {
-				assertEquals(CheckpointingMode.EXACTLY_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
-			}
-
-			// explicitly setting the mode
-			StreamExecutionEnvironment explicit = getSimpleJob();
-			explicit.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
-			for (JobVertex vertex : explicit.getStreamGraph().getJobGraph().getVertices()) {
-				assertEquals(CheckpointingMode.AT_LEAST_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private static StreamExecutionEnvironment getSimpleJob() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.generateSequence(1, 10000000)
-				.addSink(new SinkFunction<Long>() {
-					@Override
-					public void invoke(Long value) {
-					}
-				});
-		
-		return env;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
deleted file mode 100644
index a8a989b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import org.junit.Test;
-
-/**
- * The test generates two random streams (input channels) which independently
- * and randomly generate checkpoint barriers. The two streams are very
- * unaligned, putting heavy work on the BarrierBuffer.
- */
-public class BarrierBufferMassiveRandomTest {
-
-	private static final int PAGE_SIZE = 1024;
-	
-	@Test
-	public void testWithTwoChannelsAndRandomBarriers() {
-		IOManager ioMan = null;
-		try {
-			ioMan = new IOManagerAsync();
-			
-			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
-			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
-
-			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
-					new BufferPool[] { pool1, pool2 },
-					new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
-	
-			BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, ioMan);
-			
-			for (int i = 0; i < 2000000; i++) {
-				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
-				if (boe.isBuffer()) {
-					boe.getBuffer().recycle();
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			if (ioMan != null) {
-				ioMan.shutdown();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Mocks and Generators
-	// ------------------------------------------------------------------------
-	
-	protected interface BarrierGenerator {
-		public boolean isNextBarrier();
-	}
-
-	protected static class RandomBarrier implements BarrierGenerator {
-		
-		private static final Random rnd = new Random();
-
-		private final double threshold;
-
-		public RandomBarrier(double expectedEvery) {
-			threshold = 1 / expectedEvery;
-		}
-
-		@Override
-		public boolean isNextBarrier() {
-			return rnd.nextDouble() < threshold;
-		}
-	}
-
-	private static class CountBarrier implements BarrierGenerator {
-
-		private final long every;
-		private long c = 0;
-
-		public CountBarrier(long every) {
-			this.every = every;
-		}
-
-		@Override
-		public boolean isNextBarrier() {
-			return c++ % every == 0;
-		}
-	}
-
-	protected static class RandomGeneratingInputGate implements InputGate {
-
-		private final int numChannels;
-		private final BufferPool[] bufferPools;
-		private final int[] currentBarriers;
-		private final BarrierGenerator[] barrierGens;
-		private int currentChannel = 0;
-		private long c = 0;
-
-		public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
-			this.numChannels = bufferPools.length;
-			this.currentBarriers = new int[numChannels];
-			this.bufferPools = bufferPools;
-			this.barrierGens = barrierGens;
-		}
-
-		@Override
-		public int getNumberOfInputChannels() {
-			return numChannels;
-		}
-
-		@Override
-		public boolean isFinished() {
-			return false;
-		}
-
-		@Override
-		public void requestPartitions() {}
-
-		@Override
-		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-			currentChannel = (currentChannel + 1) % numChannels;
-
-			if (barrierGens[currentChannel].isNextBarrier()) {
-				return new BufferOrEvent(
-						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
-							currentChannel);
-			} else {
-				Buffer buffer = bufferPools[currentChannel].requestBuffer();
-				buffer.getMemorySegment().putLong(0, c++);
-				return new BufferOrEvent(buffer, currentChannel);
-			}
-		}
-
-		@Override
-		public void sendTaskEvent(TaskEvent event) {}
-
-		@Override
-		public void registerListener(EventListener<InputGate> listener) {}
-
-		@Override
-		public int getPageSize() {
-			return PAGE_SIZE;
-		}
-	}
-}