You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:36 UTC
[04/10] flink git commit: [streaming] Added thread-safe list to tests
[streaming] Added thread-safe list to tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b4ee2a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b4ee2a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b4ee2a1
Branch: refs/heads/master
Commit: 6b4ee2a12174adde5ca2badea70f4ebc35aa5ae3
Parents: 633b0d6
Author: Gábor Hermann <re...@gmail.com>
Authored: Fri Mar 13 09:59:09 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:03 2015 +0100
----------------------------------------------------------------------
.../flink/streaming/api/CoStreamTest.java | 97 +++++++++++++++++
.../flink/streaming/api/OutputSplitterTest.java | 106 +++++--------------
.../streaming/api/WindowCrossJoinTest.java | 67 +++++-------
.../api/collector/DirectedOutputTest.java | 39 +++----
.../streaming/util/TestListResultSink.java | 78 ++++++++++++++
.../flink/streaming/util/TestListWrapper.java | 60 +++++++++++
6 files changed, 309 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
new file mode 100644
index 0000000..20cd189
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.util.TestListResultSink;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+
+public class CoStreamTest {
+
+ private static final long MEMORY_SIZE = 32;
+
+ private static ArrayList<String> expected;
+
+ public static void main(String[] args) throws InterruptedException {
+ for (int i = 0; i < 200; i++) {
+ test();
+ }
+ }
+
+ // @Test
+ public static void test() {
+ expected = new ArrayList<String>();
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
+
+ TestListResultSink<String> resultSink = new TestListResultSink<String>();
+
+ DataStream<Integer> src = env.fromElements(1, 3, 5);
+ DataStream<Integer> src2 = env.fromElements(1, 3, 5);
+
+ DataStream<Integer> grouped = src.groupBy(new KeySelector<Integer, Integer>() {
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ });
+
+ DataStream<Integer> grouped2 = src2.groupBy(new KeySelector<Integer, Integer>() {
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ });
+
+ DataStream<String> connected = grouped.connect(grouped2).map(new CoMapFunction<Integer, Integer, String>() {
+ @Override
+ public String map1(Integer value) {
+ return value.toString();
+ }
+
+ @Override
+ public String map2(Integer value) {
+ return value.toString();
+ }
+ });
+
+ connected.addSink(resultSink);
+
+ connected.print();
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ expected = new ArrayList<String>();
+ expected.addAll(Arrays.asList("1", "1", "3", "3", "5", "5"));
+
+ System.out.println(resultSink.getResult());
+ assertEquals(expected, expected);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index a214fbf..cf6bb3c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -19,39 +19,34 @@ package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
public class OutputSplitterTest {
private static final long MEMORYSIZE = 32;
- private static ArrayList<Integer> splitterResult1 = new ArrayList<Integer>();
- private static ArrayList<Integer> splitterResult2 = new ArrayList<Integer>();
-
-
private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
@SuppressWarnings("unchecked")
@Test
public void testOnMergedDataStream() throws Exception {
- splitterResult1.clear();
- splitterResult2.clear();
+ TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
+ TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.setBufferTimeout(1);
- DataStream<Integer> d1 = env.fromElements(0,2,4,6,8);
- DataStream<Integer> d2 = env.fromElements(1,3,5,7,9);
+ DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
+ DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
d1 = d1.merge(d2);
@@ -68,19 +63,7 @@ public class OutputSplitterTest {
}
return s;
}
- }).select(">").addSink(new SinkFunction<Integer>() {
-
- private static final long serialVersionUID = 5827187510526388104L;
-
- @Override
- public void invoke(Integer value) {
- splitterResult1.add(value);
- }
-
- @Override
- public void cancel() {
- }
- });
+ }).select(">").addSink(splitterResultSink1);
d1.split(new OutputSelector<Integer>() {
private static final long serialVersionUID = -6822487543355994807L;
@@ -95,41 +78,27 @@ public class OutputSplitterTest {
}
return s;
}
- }).select("yes").addSink(new SinkFunction<Integer>() {
- private static final long serialVersionUID = -2674335071267854599L;
-
- @Override
- public void invoke(Integer value) {
- splitterResult2.add(value);
- }
-
- @Override
- public void cancel() {
- }
- });
+ }).select("yes").addSink(splitterResultSink2);
env.execute();
- Collections.sort(splitterResult1);
- Collections.sort(splitterResult2);
-
expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(5,6,7,8,9));
- assertEquals(expectedSplitterResult, splitterResult1);
+ expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9));
+ assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0,3,6,9));
- assertEquals(expectedSplitterResult, splitterResult2);
+ expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9));
+ assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
}
@Test
public void testOnSingleDataStream() throws Exception {
- splitterResult1.clear();
- splitterResult2.clear();
+ TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
+ TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.setBufferTimeout(1);
- DataStream<Integer> ds = env.fromElements(0,1,2,3,4,5,6,7,8,9);
+ DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
ds.split(new OutputSelector<Integer>() {
private static final long serialVersionUID = 2524335410904414121L;
@@ -144,19 +113,7 @@ public class OutputSplitterTest {
}
return s;
}
- }).select("even").addSink(new SinkFunction<Integer>() {
-
- private static final long serialVersionUID = -2995092337537209535L;
-
- @Override
- public void invoke(Integer value) {
- splitterResult1.add(value);
- }
-
- @Override
- public void cancel() {
- }
- });
+ }).select("even").addSink(splitterResultSink1);
ds.split(new OutputSelector<Integer>() {
@@ -172,30 +129,15 @@ public class OutputSplitterTest {
}
return s;
}
- }).select("yes").addSink(new SinkFunction<Integer>() {
-
- private static final long serialVersionUID = -1749077049727705424L;
-
- @Override
- public void invoke(Integer value) {
- splitterResult2.add(value);
- }
-
- @Override
- public void cancel() {
- }
- });
+ }).select("yes").addSink(splitterResultSink2);
env.execute();
- Collections.sort(splitterResult1);
- Collections.sort(splitterResult2);
-
expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0,2,4,6,8));
- assertEquals(expectedSplitterResult, splitterResult1);
+ expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8));
+ assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
expectedSplitterResult.clear();
- expectedSplitterResult.addAll(Arrays.asList(0,4,8));
- assertEquals(expectedSplitterResult, splitterResult2);
+ expectedSplitterResult.addAll(Arrays.asList(0, 4, 8));
+ assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index bd97917..cccec40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -24,12 +24,13 @@ import java.util.ArrayList;
import java.util.HashSet;
import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
@@ -39,17 +40,29 @@ public class WindowCrossJoinTest implements Serializable {
private static final long MEMORYSIZE = 32;
- private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-
- private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
+ private static class MyTimestamp<T> implements Timestamp<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(T value) {
+ return 101L;
+ }
+ }
+
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.setBufferTimeout(1);
+ TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> joinResultSink =
+ new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>();
+ TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> crossResultSink =
+ new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>();
+
ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>();
@@ -101,7 +114,8 @@ public class WindowCrossJoinTest implements Serializable {
.join(inStream2)
.onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
- .addSink(new JoinResultSink());
+ .map(new ResultMap())
+ .addSink(joinResultSink);
inStream1
.cross(inStream2)
@@ -116,50 +130,27 @@ public class WindowCrossJoinTest implements Serializable {
Tuple2<Integer, String> val1, Tuple1<Integer> val2) throws Exception {
return new Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>(val1, val2);
}
- }).addSink(new CrossResultSink());
+ })
+ .map(new ResultMap())
+ .addSink(crossResultSink);
env.execute();
assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinExpectedResults),
- new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinResults));
+ new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinResultSink.getResult()));
assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossExpectedResults),
- new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossResults));
- }
-
- private static class MyTimestamp<T> implements Timestamp<T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(T value) {
- return 101L;
- }
+ new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossResultSink.getResult()));
}
- private static class JoinResultSink implements
- SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
+ private static class ResultMap implements
+ MapFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>,
+ Tuple2<Tuple2<Integer, String>, Integer>> {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
- joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
- }
-
- @Override
- public void cancel() {
+ public Tuple2<Tuple2<Integer, String>, Integer> map(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) throws Exception {
+ return new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0);
}
}
- private static class CrossResultSink implements
- SinkFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
- crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
- }
-
- @Override
- public void cancel() {
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 9d166e5..ffc7c74 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -17,11 +17,7 @@
package org.apache.flink.streaming.api.collector;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,15 +26,17 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestListResultSink;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
public class DirectedOutputTest {
private static final String TEN = "ten";
private static final String ODD = "odd";
- private static final String ALL = "all";
- private static final String EVEN_AND_ODD = "evenAndOdd";
- private static final String ODD_AND_TEN = "oddAndTen";
private static final String EVEN = "even";
private static final String NON_SELECTED = "nonSelected";
@@ -98,19 +96,24 @@ public class DirectedOutputTest {
@Test
public void outputSelectorTest() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, 32);
+ StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32);
+
+ TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
+ TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();
+ TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
+ TestListResultSink<Long> allSink = new TestListResultSink<Long>();
SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
- source.select(EVEN).addSink(new ListSink(EVEN));
- source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
- source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
- source.addSink(new ListSink(ALL));
+ source.select(EVEN).addSink(evenSink);
+ source.select(ODD, TEN).addSink(oddAndTenSink);
+ source.select(EVEN, ODD).addSink(evenAndOddSink);
+ source.addSink(allSink);
env.execute();
- assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
- assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
+ assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult());
+ assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
- outputs.get(EVEN_AND_ODD));
- assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
+ evenAndOddSink.getResult());
+ assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), allSink.getResult());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
new file mode 100644
index 0000000..7f72173
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+
+public class TestListResultSink<T> extends RichSinkFunction<T> {
+
+ private int resultListId;
+
+ public TestListResultSink() {
+ this.resultListId = TestListWrapper.getInstance().createList();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public void invoke(T value) throws Exception {
+ synchronized (resultList()) {
+ resultList().add(value);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+ private List<T> resultList() {
+ synchronized (TestListWrapper.getInstance()) {
+ return (List<T>) TestListWrapper.getInstance().getList(resultListId);
+ }
+ }
+
+ public List<T> getResult() {
+ synchronized (resultList()) {
+ ArrayList<T> copiedList = new ArrayList<T>(resultList());
+ return copiedList;
+ }
+ }
+
+ public List<T> getSortedResult() {
+ synchronized (resultList()) {
+ TreeSet<T> treeSet = new TreeSet<T>(resultList());
+ ArrayList<T> sortedList = new ArrayList<T>(treeSet);
+ return sortedList;
+ }
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b4ee2a1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
new file mode 100644
index 0000000..3c50f63
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestListWrapper {
+
+ private static TestListWrapper instance;
+
+ private List<List<? extends Comparable>> lists;
+
+ private TestListWrapper() {
+ lists = Collections.synchronizedList(new ArrayList<List<? extends Comparable>>());
+ }
+
+ public static TestListWrapper getInstance() {
+ if (instance == null) {
+ instance = new TestListWrapper();
+ }
+ return instance;
+ }
+
+ /**
+ * Creates and stores a list, returns with the id.
+ *
+ * @return The ID of the list.
+ */
+ public int createList() {
+ lists.add(new ArrayList<Comparable>());
+ return lists.size() - 1;
+ }
+
+ public List<?> getList(int listId) {
+ List<? extends Comparable> list = lists.get(listId);
+ if (list == null) {
+ throw new RuntimeException("No such list.");
+ }
+
+ return list;
+ }
+
+}
\ No newline at end of file