You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:40 UTC
[08/10] flink git commit: [FLINK-1618] [streaming] Parallel time
reduce
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index d7338a0..34986c8 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.WindowMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Time;
@@ -129,81 +131,113 @@ public class WindowIntegrationTest implements Serializable {
source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
.getDiscretizedStream().addSink(new DistributedSink5());
+ DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(Collector<Integer> collector) throws Exception {
+ for (int i = 1; i <= 10; i++) {
+ collector.collect(i);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ });
+
+ DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(Collector<Integer> collector) throws Exception {
+ for (int i = 1; i <= 11; i++) {
+ if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
+ collector.collect(i);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ });
+
+ source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
+ .addSink(new DistributedSink6());
+
+ source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
+ .addSink(new DistributedSink7());
+
env.execute();
- // sum ( Time of 3 slide 2 )
- List<StreamWindow<Integer>> expected1 = new
- ArrayList<StreamWindow<Integer>>();
- expected1.add(StreamWindow.fromElements(5));
- expected1.add(StreamWindow.fromElements(11));
- expected1.add(StreamWindow.fromElements(9));
- expected1.add(StreamWindow.fromElements(10));
- expected1.add(StreamWindow.fromElements(32));
-
- validateOutput(expected1, CentralSink1.windows);
-
- // Tumbling Time of 4 grouped by mod 2
- List<StreamWindow<Integer>> expected2 = new
- ArrayList<StreamWindow<Integer>>();
- expected2.add(StreamWindow.fromElements(2, 2, 4));
- expected2.add(StreamWindow.fromElements(1, 3));
- expected2.add(StreamWindow.fromElements(5));
- expected2.add(StreamWindow.fromElements(10));
- expected2.add(StreamWindow.fromElements(11, 11));
-
- validateOutput(expected2, CentralSink2.windows);
-
- // groupby mod 2 sum ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected3 = new
- ArrayList<StreamWindow<Integer>>();
- expected3.add(StreamWindow.fromElements(4));
- expected3.add(StreamWindow.fromElements(5));
- expected3.add(StreamWindow.fromElements(22));
- expected3.add(StreamWindow.fromElements(8));
- expected3.add(StreamWindow.fromElements(10));
-
- validateOutput(expected3, DistributedSink1.windows);
-
- // groupby mod3 Tumbling Count of 2 grouped by mod 2
- List<StreamWindow<Integer>> expected4 = new
- ArrayList<StreamWindow<Integer>>();
- expected4.add(StreamWindow.fromElements(2, 2));
- expected4.add(StreamWindow.fromElements(1));
- expected4.add(StreamWindow.fromElements(4));
- expected4.add(StreamWindow.fromElements(5, 11));
- expected4.add(StreamWindow.fromElements(10));
- expected4.add(StreamWindow.fromElements(11));
- expected4.add(StreamWindow.fromElements(3));
-
- validateOutput(expected4, DistributedSink2.windows);
-
- // min ( Time of 2 slide 3 )
- List<StreamWindow<Integer>> expected5 = new
- ArrayList<StreamWindow<Integer>>();
- expected5.add(StreamWindow.fromElements(1));
- expected5.add(StreamWindow.fromElements(4));
- expected5.add(StreamWindow.fromElements(10));
-
- validateOutput(expected5, CentralSink3.windows);
-
- // groupby mod 2 max ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected6 = new
- ArrayList<StreamWindow<Integer>>();
- expected6.add(StreamWindow.fromElements(3));
- expected6.add(StreamWindow.fromElements(5));
- expected6.add(StreamWindow.fromElements(11));
- expected6.add(StreamWindow.fromElements(4));
- expected6.add(StreamWindow.fromElements(10));
-
- validateOutput(expected6, DistributedSink3.windows);
-
- List<StreamWindow<Integer>> expected7 = new
- ArrayList<StreamWindow<Integer>>();
- expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
- expected7.add(StreamWindow.fromElements(10));
- expected7.add(StreamWindow.fromElements(10, 11, 11));
-
- validateOutput(expected7, DistributedSink4.windows);
+ // sum ( Time of 3 slide 2 )
+ List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
+ expected1.add(StreamWindow.fromElements(5));
+ expected1.add(StreamWindow.fromElements(11));
+ expected1.add(StreamWindow.fromElements(9));
+ expected1.add(StreamWindow.fromElements(10));
+ expected1.add(StreamWindow.fromElements(32));
+
+ validateOutput(expected1, CentralSink1.windows);
+
+ // Tumbling Time of 4 grouped by mod 2
+ List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
+ expected2.add(StreamWindow.fromElements(2, 2, 4));
+ expected2.add(StreamWindow.fromElements(1, 3));
+ expected2.add(StreamWindow.fromElements(5));
+ expected2.add(StreamWindow.fromElements(10));
+ expected2.add(StreamWindow.fromElements(11, 11));
+
+ validateOutput(expected2, CentralSink2.windows);
+
+ // groupby mod 2 sum ( Tumbling Time of 4)
+ List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
+ expected3.add(StreamWindow.fromElements(4));
+ expected3.add(StreamWindow.fromElements(5));
+ expected3.add(StreamWindow.fromElements(22));
+ expected3.add(StreamWindow.fromElements(8));
+ expected3.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected3, DistributedSink1.windows);
+
+ // groupby mod3 Tumbling Count of 2 grouped by mod 2
+ List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
+ expected4.add(StreamWindow.fromElements(2, 2));
+ expected4.add(StreamWindow.fromElements(1));
+ expected4.add(StreamWindow.fromElements(4));
+ expected4.add(StreamWindow.fromElements(5, 11));
+ expected4.add(StreamWindow.fromElements(10));
+ expected4.add(StreamWindow.fromElements(11));
+ expected4.add(StreamWindow.fromElements(3));
+
+ validateOutput(expected4, DistributedSink2.windows);
+
+ // min ( Time of 2 slide 3 )
+ List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
+ expected5.add(StreamWindow.fromElements(1));
+ expected5.add(StreamWindow.fromElements(4));
+ expected5.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected5, CentralSink3.windows);
+
+ // groupby mod 2 max ( Tumbling Time of 4)
+ List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
+ expected6.add(StreamWindow.fromElements(3));
+ expected6.add(StreamWindow.fromElements(5));
+ expected6.add(StreamWindow.fromElements(11));
+ expected6.add(StreamWindow.fromElements(4));
+ expected6.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected6, DistributedSink3.windows);
+
+ List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
+ expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+ expected7.add(StreamWindow.fromElements(10));
+ expected7.add(StreamWindow.fromElements(10, 11, 11));
+
+ validateOutput(expected7, DistributedSink4.windows);
List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
expected8.add(StreamWindow.fromElements(4, 8));
@@ -216,6 +250,25 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected8, DistributedSink5.windows);
+ List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
+ expected9.add(StreamWindow.fromElements(6));
+ expected9.add(StreamWindow.fromElements(14));
+ expected9.add(StreamWindow.fromElements(22));
+ expected9.add(StreamWindow.fromElements(30));
+ expected9.add(StreamWindow.fromElements(38));
+
+ validateOutput(expected9, DistributedSink6.windows);
+
+ List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
+ expected10.add(StreamWindow.fromElements(6, 9));
+ expected10.add(StreamWindow.fromElements(16, 24));
+
+ for (List<Integer> sw : DistributedSink7.windows) {
+ Collections.sort(sw);
+ }
+
+ validateOutput(expected10, DistributedSink7.windows);
+
}
public static <R> void validateOutput(List<R> expected, List<R> actual) {
@@ -357,4 +410,39 @@ public class WindowIntegrationTest implements Serializable {
}
}
+
+ @SuppressWarnings("serial")
+ private static class DistributedSink6 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class DistributedSink7 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
index ac3f583..77037d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
@@ -52,15 +52,15 @@ public class WindowMergerTest {
List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
input.add(w1);
input.add(w4_1);
- input.addAll(w2.split(2));
- input.addAll(w3.partitionBy(new KeySelector<Integer, Integer>() {
+ input.addAll(StreamWindow.split(w2, 2));
+ input.addAll(StreamWindow.partitionBy(w3, new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer getKey(Integer value) throws Exception {
return value % 2;
}
- }));
+ }, false));
input.add(w4_2);
List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMerger, input);
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
index da68211..9a2416c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
@@ -42,12 +42,12 @@ public class WindowPartitionerTest {
StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
- expected1.addAll(w1.split(2));
- expected1.addAll(w2.split(2));
+ expected1.addAll(StreamWindow.split(w1,2));
+ expected1.addAll(StreamWindow.split(w2,2));
List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
- expected2.addAll(w1.partitionBy(new MyKey()));
- expected2.addAll(w2.partitionBy(new MyKey()));
+ expected2.addAll(StreamWindow.partitionBy(w1,new MyKey(),false));
+ expected2.addAll(StreamWindow.partitionBy(w2,new MyKey(),false));
List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
input.add(w1);
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
index a07b681..c3efc7b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
@@ -92,7 +92,7 @@ public class StreamWindowTest {
// good
}
- List<StreamWindow<Integer>> wList = merged.split(3);
+ List<StreamWindow<Integer>> wList = StreamWindow.split(merged,3);
StreamWindow<Integer> merged2 = StreamWindow.merge(wList);
@@ -133,12 +133,12 @@ public class StreamWindowTest {
streamWindow.add(5);
streamWindow.add(6);
- List<StreamWindow<Integer>> split = streamWindow.split(2);
+ List<StreamWindow<Integer>> split = StreamWindow.split(streamWindow,2);
assertEquals(2, split.size());
assertEquals(StreamWindow.fromElements(1, 2, 3), split.get(0));
assertEquals(StreamWindow.fromElements(4, 5, 6), split.get(1));
- List<StreamWindow<Integer>> split2 = streamWindow.split(6);
+ List<StreamWindow<Integer>> split2 = StreamWindow.split(streamWindow,6);
assertEquals(6, split2.size());
assertEquals(StreamWindow.fromElements(1), split2.get(0));
assertEquals(StreamWindow.fromElements(2), split2.get(1));
@@ -147,7 +147,7 @@ public class StreamWindowTest {
assertEquals(StreamWindow.fromElements(5), split2.get(4));
assertEquals(StreamWindow.fromElements(6), split2.get(5));
- List<StreamWindow<Integer>> split3 = streamWindow.split(10);
+ List<StreamWindow<Integer>> split3 = StreamWindow.split(streamWindow,10);
assertEquals(6, split3.size());
assertEquals(StreamWindow.fromElements(1), split3.get(0));
assertEquals(StreamWindow.fromElements(2), split3.get(1));
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index 95aace0..55abc72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -97,6 +97,32 @@ public class TumblingGroupedPreReducerTest {
}
+ @Test
+ public void testEmitWindow2() throws Exception {
+
+ List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
+ inputs.add(new Tuple2<Integer, Integer>(1, 1));
+ inputs.add(new Tuple2<Integer, Integer>(0, 0));
+ inputs.add(new Tuple2<Integer, Integer>(1, -1));
+ inputs.add(new Tuple2<Integer, Integer>(1, -2));
+
+ TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
+
+ WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
+ reducer, key, serializer).sequentialID();
+
+ wb.store(serializer.copy(inputs.get(0)));
+ wb.store(serializer.copy(inputs.get(1)));
+ wb.emitWindow(collector);
+
+ System.out.println(collected);
+
+ wb.store(serializer.copy(inputs.get(0)));
+ wb.store(serializer.copy(inputs.get(1)));
+ wb.store(serializer.copy(inputs.get(2)));
+ }
+
private static <T> void assertSetEquals(Collection<T> first, Collection<T> second) {
assertEquals(new HashSet<T>(first), new HashSet<T>(second));
}