You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:36 UTC

[04/10] flink git commit: [streaming] Added thread-safe list to tests

[streaming] Added thread-safe list to tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b4ee2a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b4ee2a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b4ee2a1

Branch: refs/heads/master
Commit: 6b4ee2a12174adde5ca2badea70f4ebc35aa5ae3
Parents: 633b0d6
Author: Gábor Hermann <re...@gmail.com>
Authored: Fri Mar 13 09:59:09 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/CoStreamTest.java       |  97 +++++++++++++++++
 .../flink/streaming/api/OutputSplitterTest.java | 106 +++++--------------
 .../streaming/api/WindowCrossJoinTest.java      |  67 +++++-------
 .../api/collector/DirectedOutputTest.java       |  39 +++----
 .../streaming/util/TestListResultSink.java      |  78 ++++++++++++++
 .../flink/streaming/util/TestListWrapper.java   |  60 +++++++++++
 6 files changed, 309 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
new file mode 100644
index 0000000..20cd189
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.util.TestListResultSink;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+
+public class CoStreamTest {
+
+	private static final long MEMORY_SIZE = 32;
+
+	private static ArrayList<String> expected;
+
+	public static void main(String[] args) throws InterruptedException {
+		for (int i = 0; i < 200; i++) {
+			test();
+		}
+	}
+
+	//	@Test
+	public static void test() {
+		expected = new ArrayList<String>();
+
+		StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
+
+		TestListResultSink<String> resultSink = new TestListResultSink<String>();
+
+		DataStream<Integer> src = env.fromElements(1, 3, 5);
+		DataStream<Integer> src2 = env.fromElements(1, 3, 5);
+
+		DataStream<Integer> grouped = src.groupBy(new KeySelector<Integer, Integer>() {
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		});
+
+		DataStream<Integer> grouped2 = src2.groupBy(new KeySelector<Integer, Integer>() {
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		});
+
+		DataStream<String> connected = grouped.connect(grouped2).map(new CoMapFunction<Integer, Integer, String>() {
+			@Override
+			public String map1(Integer value) {
+				return value.toString();
+			}
+
+			@Override
+			public String map2(Integer value) {
+				return value.toString();
+			}
+		});
+
+		connected.addSink(resultSink);
+
+		connected.print();
+
+		try {
+			env.execute();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+		expected = new ArrayList<String>();
+		expected.addAll(Arrays.asList("1", "1", "3", "3", "5", "5"));
+
+		System.out.println(resultSink.getResult());
+		assertEquals(expected, expected);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index a214fbf..cf6bb3c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -19,39 +19,34 @@ package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 public class OutputSplitterTest {
 
 	private static final long MEMORYSIZE = 32;
 
-	private static ArrayList<Integer> splitterResult1 = new ArrayList<Integer>();
-	private static ArrayList<Integer> splitterResult2 = new ArrayList<Integer>();
-
-
 	private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
 
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testOnMergedDataStream() throws Exception {
-		splitterResult1.clear();
-		splitterResult2.clear();
+		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
+		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
 
 		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
 		env.setBufferTimeout(1);
 
-		DataStream<Integer> d1 = env.fromElements(0,2,4,6,8);
-		DataStream<Integer> d2 = env.fromElements(1,3,5,7,9);
+		DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
+		DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
 
 		d1 = d1.merge(d2);
 
@@ -68,19 +63,7 @@ public class OutputSplitterTest {
 				}
 				return s;
 			}
-		}).select(">").addSink(new SinkFunction<Integer>() {
-
-			private static final long serialVersionUID = 5827187510526388104L;
-
-			@Override
-			public void invoke(Integer value) {
-				splitterResult1.add(value);
-			}
-			
-			@Override
-			public void cancel() {
-			}
-		});
+		}).select(">").addSink(splitterResultSink1);
 
 		d1.split(new OutputSelector<Integer>() {
 			private static final long serialVersionUID = -6822487543355994807L;
@@ -95,41 +78,27 @@ public class OutputSplitterTest {
 				}
 				return s;
 			}
-		}).select("yes").addSink(new SinkFunction<Integer>() {
-			private static final long serialVersionUID = -2674335071267854599L;
-
-			@Override
-			public void invoke(Integer value) {
-				splitterResult2.add(value);
-			}
-			
-			@Override
-			public void cancel() {
-			}
-		});
+		}).select("yes").addSink(splitterResultSink2);
 		env.execute();
 
-		Collections.sort(splitterResult1);
-		Collections.sort(splitterResult2);
-
 		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(5,6,7,8,9));
-		assertEquals(expectedSplitterResult, splitterResult1);
+		expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9));
+		assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
 
 		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0,3,6,9));
-		assertEquals(expectedSplitterResult, splitterResult2);
+		expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9));
+		assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
 	}
 
 	@Test
 	public void testOnSingleDataStream() throws Exception {
-		splitterResult1.clear();
-		splitterResult2.clear();
+		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
+		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
 
 		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
 		env.setBufferTimeout(1);
 
-		DataStream<Integer> ds = env.fromElements(0,1,2,3,4,5,6,7,8,9);
+		DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
 
 		ds.split(new OutputSelector<Integer>() {
 			private static final long serialVersionUID = 2524335410904414121L;
@@ -144,19 +113,7 @@ public class OutputSplitterTest {
 				}
 				return s;
 			}
-		}).select("even").addSink(new SinkFunction<Integer>() {
-
-			private static final long serialVersionUID = -2995092337537209535L;
-
-			@Override
-			public void invoke(Integer value) {
-				splitterResult1.add(value);
-			}
-			
-			@Override
-			public void cancel() {
-			}
-		});
+		}).select("even").addSink(splitterResultSink1);
 
 		ds.split(new OutputSelector<Integer>() {
 
@@ -172,30 +129,15 @@ public class OutputSplitterTest {
 				}
 				return s;
 			}
-		}).select("yes").addSink(new SinkFunction<Integer>() {
-
-			private static final long serialVersionUID = -1749077049727705424L;
-
-			@Override
-			public void invoke(Integer value) {
-				splitterResult2.add(value);
-			}
-			
-			@Override
-			public void cancel() {
-			}
-		});
+		}).select("yes").addSink(splitterResultSink2);
 		env.execute();
 
-		Collections.sort(splitterResult1);
-		Collections.sort(splitterResult2);
-
 		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0,2,4,6,8));
-		assertEquals(expectedSplitterResult, splitterResult1);
+		expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8));
+		assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
 
 		expectedSplitterResult.clear();
-		expectedSplitterResult.addAll(Arrays.asList(0,4,8));
-		assertEquals(expectedSplitterResult, splitterResult2);
+		expectedSplitterResult.addAll(Arrays.asList(0, 4, 8));
+		assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index bd97917..cccec40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -24,12 +24,13 @@ import java.util.ArrayList;
 import java.util.HashSet;
 
 import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.TestListResultSink;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
@@ -39,17 +40,29 @@ public class WindowCrossJoinTest implements Serializable {
 
 	private static final long MEMORYSIZE = 32;
 
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
 	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-
-	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
 	private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
 
+	private static class MyTimestamp<T> implements Timestamp<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long getTimestamp(T value) {
+			return 101L;
+		}
+	}
+
 	@Test
 	public void test() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
 		env.setBufferTimeout(1);
 
+		TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> joinResultSink =
+				new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>();
+		TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> crossResultSink =
+				new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>();
+
 		ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
 		ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>();
 
@@ -101,7 +114,8 @@ public class WindowCrossJoinTest implements Serializable {
 				.join(inStream2)
 				.onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
 						new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
-				.addSink(new JoinResultSink());
+				.map(new ResultMap())
+				.addSink(joinResultSink);
 
 		inStream1
 				.cross(inStream2)
@@ -116,50 +130,27 @@ public class WindowCrossJoinTest implements Serializable {
 							Tuple2<Integer, String> val1, Tuple1<Integer> val2) throws Exception {
 						return new Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>(val1, val2);
 					}
-				}).addSink(new CrossResultSink());
+				})
+				.map(new ResultMap())
+				.addSink(crossResultSink);
 
 		env.execute();
 
 		assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinExpectedResults),
-				new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinResults));
+				new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinResultSink.getResult()));
 		assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossExpectedResults),
-				new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossResults));
-	}
-
-	private static class MyTimestamp<T> implements Timestamp<T> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(T value) {
-			return 101L;
-		}
+				new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossResultSink.getResult()));
 	}
 
-	private static class JoinResultSink implements
-			SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
+	private static class ResultMap implements
+			MapFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>,
+					Tuple2<Tuple2<Integer, String>, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
-			joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
-		}
-
-		@Override
-		public void cancel() {
+		public Tuple2<Tuple2<Integer, String>, Integer> map(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) throws Exception {
+			return new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0);
 		}
 	}
 
-	private static class CrossResultSink implements
-			SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
-			crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
-		}
-
-		@Override
-		public void cancel() {
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 9d166e5..ffc7c74 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -17,11 +17,7 @@
 
 package org.apache.flink.streaming.api.collector;
 
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,15 +26,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestListResultSink;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
 
 public class DirectedOutputTest {
 
 	private static final String TEN = "ten";
 	private static final String ODD = "odd";
-	private static final String ALL = "all";
-	private static final String EVEN_AND_ODD = "evenAndOdd";
-	private static final String ODD_AND_TEN = "oddAndTen";
 	private static final String EVEN = "even";
 	private static final String NON_SELECTED = "nonSelected";
 
@@ -98,19 +96,24 @@ public class DirectedOutputTest {
 
 	@Test
 	public void outputSelectorTest() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, 32);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32);
+
+		TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
+		TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();
+		TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
+		TestListResultSink<Long> allSink = new TestListResultSink<Long>();
 
 		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
-		source.select(EVEN).addSink(new ListSink(EVEN));
-		source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
-		source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
-		source.addSink(new ListSink(ALL));
+		source.select(EVEN).addSink(evenSink);
+		source.select(ODD, TEN).addSink(oddAndTenSink);
+		source.select(EVEN, ODD).addSink(evenAndOddSink);
+		source.addSink(allSink);
 
 		env.execute();
-		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
-		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
+		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult());
+		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
 		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
-				outputs.get(EVEN_AND_ODD));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
+				evenAndOddSink.getResult());
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), allSink.getResult());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
new file mode 100644
index 0000000..7f72173
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+
+public class TestListResultSink<T> extends RichSinkFunction<T> {
+
+	private int resultListId;
+
+	public TestListResultSink() {
+		this.resultListId = TestListWrapper.getInstance().createList();
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+	}
+
+	@Override
+	public void invoke(T value) throws Exception {
+		synchronized (resultList()) {
+			resultList().add(value);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+	}
+
+	private List<T> resultList() {
+		synchronized (TestListWrapper.getInstance()) {
+			return (List<T>) TestListWrapper.getInstance().getList(resultListId);
+		}
+	}
+
+	public List<T> getResult() {
+		synchronized (resultList()) {
+			ArrayList<T> copiedList = new ArrayList<T>(resultList());
+			return copiedList;
+		}
+	}
+
+	public List<T> getSortedResult() {
+		synchronized (resultList()) {
+			TreeSet<T> treeSet = new TreeSet<T>(resultList());
+			ArrayList<T> sortedList = new ArrayList<T>(treeSet);
+			return sortedList;
+		}
+	}
+
+	@Override
+	public void cancel() {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
new file mode 100644
index 0000000..3c50f63
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestListWrapper {
+
+	private static TestListWrapper instance;
+
+	private List<List<? extends Comparable>> lists;
+
+	private TestListWrapper() {
+		lists = Collections.synchronizedList(new ArrayList<List<? extends Comparable>>());
+	}
+
+	public static TestListWrapper getInstance() {
+		if (instance == null) {
+			instance = new TestListWrapper();
+		}
+		return instance;
+	}
+
+	/**
+	 * Creates and stores a list, returns with the id.
+	 *
+	 * @return The ID of the list.
+	 */
+	public int createList() {
+		lists.add(new ArrayList<Comparable>());
+		return lists.size() - 1;
+	}
+
+	public List<?> getList(int listId) {
+		List<? extends Comparable> list = lists.get(listId);
+		if (list == null) {
+			throw new RuntimeException("No such list.");
+		}
+
+		return list;
+	}
+
+}
\ No newline at end of file