You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:40 UTC

[08/10] flink git commit: [FLINK-1618] [streaming] Parallel time reduce

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index d7338a0..34986c8 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Time;
@@ -129,81 +131,113 @@ public class WindowIntegrationTest implements Serializable {
 		source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
 				.getDiscretizedStream().addSink(new DistributedSink5());
 
+		DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(Collector<Integer> collector) throws Exception {
+				for (int i = 1; i <= 10; i++) {
+					collector.collect(i);
+				}
+			}
+
+			@Override
+			public void cancel() {
+			}
+		});
+
+		DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(Collector<Integer> collector) throws Exception {
+				for (int i = 1; i <= 11; i++) {
+					if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
+						collector.collect(i);
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+			}
+		});
+
+		source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
+				.addSink(new DistributedSink6());
+
+		source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
+				.addSink(new DistributedSink7());
+
 		env.execute();
 
-		 // sum ( Time of 3 slide 2 )
-		 List<StreamWindow<Integer>> expected1 = new
-		 ArrayList<StreamWindow<Integer>>();
-		 expected1.add(StreamWindow.fromElements(5));
-		 expected1.add(StreamWindow.fromElements(11));
-		 expected1.add(StreamWindow.fromElements(9));
-		 expected1.add(StreamWindow.fromElements(10));
-		 expected1.add(StreamWindow.fromElements(32));
-		
-		 validateOutput(expected1, CentralSink1.windows);
-		
-		 // Tumbling Time of 4 grouped by mod 2
-		 List<StreamWindow<Integer>> expected2 = new
-		 ArrayList<StreamWindow<Integer>>();
-		 expected2.add(StreamWindow.fromElements(2, 2, 4));
-		 expected2.add(StreamWindow.fromElements(1, 3));
-		 expected2.add(StreamWindow.fromElements(5));
-		 expected2.add(StreamWindow.fromElements(10));
-		 expected2.add(StreamWindow.fromElements(11, 11));
-		
-		 validateOutput(expected2, CentralSink2.windows);
-		
-		 // groupby mod 2 sum ( Tumbling Time of 4)
-		 List<StreamWindow<Integer>> expected3 = new
-		 ArrayList<StreamWindow<Integer>>();
-		 expected3.add(StreamWindow.fromElements(4));
-		 expected3.add(StreamWindow.fromElements(5));
-		 expected3.add(StreamWindow.fromElements(22));
-		 expected3.add(StreamWindow.fromElements(8));
-		 expected3.add(StreamWindow.fromElements(10));
-		
-		 validateOutput(expected3, DistributedSink1.windows);
-		
-		 // groupby mod3 Tumbling Count of 2 grouped by mod 2
-		 List<StreamWindow<Integer>> expected4 = new
-		 ArrayList<StreamWindow<Integer>>();
-		 expected4.add(StreamWindow.fromElements(2, 2));
-		 expected4.add(StreamWindow.fromElements(1));
-		 expected4.add(StreamWindow.fromElements(4));
-		 expected4.add(StreamWindow.fromElements(5, 11));
-		 expected4.add(StreamWindow.fromElements(10));
-		 expected4.add(StreamWindow.fromElements(11));
-		 expected4.add(StreamWindow.fromElements(3));
-		
-		 validateOutput(expected4, DistributedSink2.windows);
-		
-		 // min ( Time of 2 slide 3 )
-		 List<StreamWindow<Integer>> expected5 = new
-		 ArrayList<StreamWindow<Integer>>();
-		 expected5.add(StreamWindow.fromElements(1));
-		 expected5.add(StreamWindow.fromElements(4));
-		 expected5.add(StreamWindow.fromElements(10));
-		
-		 validateOutput(expected5, CentralSink3.windows);
-		
-		 // groupby mod 2 max ( Tumbling Time of 4)
-		 List<StreamWindow<Integer>> expected6 = new
-		 ArrayList<StreamWindow<Integer>>();
-		 expected6.add(StreamWindow.fromElements(3));
-		 expected6.add(StreamWindow.fromElements(5));
-		 expected6.add(StreamWindow.fromElements(11));
-		 expected6.add(StreamWindow.fromElements(4));
-		 expected6.add(StreamWindow.fromElements(10));
-		
-		 validateOutput(expected6, DistributedSink3.windows);
-		
-		 List<StreamWindow<Integer>> expected7 = new
-		 ArrayList<StreamWindow<Integer>>();
-		 expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
-		 expected7.add(StreamWindow.fromElements(10));
-		 expected7.add(StreamWindow.fromElements(10, 11, 11));
-		
-		 validateOutput(expected7, DistributedSink4.windows);
+		// sum ( Time of 3 slide 2 )
+		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
+		expected1.add(StreamWindow.fromElements(5));
+		expected1.add(StreamWindow.fromElements(11));
+		expected1.add(StreamWindow.fromElements(9));
+		expected1.add(StreamWindow.fromElements(10));
+		expected1.add(StreamWindow.fromElements(32));
+
+		validateOutput(expected1, CentralSink1.windows);
+
+		// Tumbling Time of 4 grouped by mod 2
+		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
+		expected2.add(StreamWindow.fromElements(2, 2, 4));
+		expected2.add(StreamWindow.fromElements(1, 3));
+		expected2.add(StreamWindow.fromElements(5));
+		expected2.add(StreamWindow.fromElements(10));
+		expected2.add(StreamWindow.fromElements(11, 11));
+
+		validateOutput(expected2, CentralSink2.windows);
+
+		// groupby mod 2 sum ( Tumbling Time of 4)
+		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
+		expected3.add(StreamWindow.fromElements(4));
+		expected3.add(StreamWindow.fromElements(5));
+		expected3.add(StreamWindow.fromElements(22));
+		expected3.add(StreamWindow.fromElements(8));
+		expected3.add(StreamWindow.fromElements(10));
+
+		validateOutput(expected3, DistributedSink1.windows);
+
+		// groupby mod3 Tumbling Count of 2 grouped by mod 2
+		List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
+		expected4.add(StreamWindow.fromElements(2, 2));
+		expected4.add(StreamWindow.fromElements(1));
+		expected4.add(StreamWindow.fromElements(4));
+		expected4.add(StreamWindow.fromElements(5, 11));
+		expected4.add(StreamWindow.fromElements(10));
+		expected4.add(StreamWindow.fromElements(11));
+		expected4.add(StreamWindow.fromElements(3));
+
+		validateOutput(expected4, DistributedSink2.windows);
+
+		// min ( Time of 2 slide 3 )
+		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
+		expected5.add(StreamWindow.fromElements(1));
+		expected5.add(StreamWindow.fromElements(4));
+		expected5.add(StreamWindow.fromElements(10));
+
+		validateOutput(expected5, CentralSink3.windows);
+
+		// groupby mod 2 max ( Tumbling Time of 4)
+		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
+		expected6.add(StreamWindow.fromElements(3));
+		expected6.add(StreamWindow.fromElements(5));
+		expected6.add(StreamWindow.fromElements(11));
+		expected6.add(StreamWindow.fromElements(4));
+		expected6.add(StreamWindow.fromElements(10));
+
+		validateOutput(expected6, DistributedSink3.windows);
+
+		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
+		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+		expected7.add(StreamWindow.fromElements(10));
+		expected7.add(StreamWindow.fromElements(10, 11, 11));
+
+		validateOutput(expected7, DistributedSink4.windows);
 
 		List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
 		expected8.add(StreamWindow.fromElements(4, 8));
@@ -216,6 +250,25 @@ public class WindowIntegrationTest implements Serializable {
 
 		validateOutput(expected8, DistributedSink5.windows);
 
+		List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
+		expected9.add(StreamWindow.fromElements(6));
+		expected9.add(StreamWindow.fromElements(14));
+		expected9.add(StreamWindow.fromElements(22));
+		expected9.add(StreamWindow.fromElements(30));
+		expected9.add(StreamWindow.fromElements(38));
+
+		validateOutput(expected9, DistributedSink6.windows);
+
+		List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
+		expected10.add(StreamWindow.fromElements(6, 9));
+		expected10.add(StreamWindow.fromElements(16, 24));
+
+		for (List<Integer> sw : DistributedSink7.windows) {
+			Collections.sort(sw);
+		}
+
+		validateOutput(expected10, DistributedSink7.windows);
+
 	}
 
 	public static <R> void validateOutput(List<R> expected, List<R> actual) {
@@ -357,4 +410,39 @@ public class WindowIntegrationTest implements Serializable {
 		}
 
 	}
+
+	@SuppressWarnings("serial")
+	private static class DistributedSink6 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+		@Override
+		public void cancel() {
+		}
+
+	}
+
+	@SuppressWarnings("serial")
+	private static class DistributedSink7 implements SinkFunction<StreamWindow<Integer>> {
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+		@Override
+		public void cancel() {
+		}
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
index ac3f583..77037d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMergerTest.java
@@ -52,15 +52,15 @@ public class WindowMergerTest {
 		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
 		input.add(w1);
 		input.add(w4_1);
-		input.addAll(w2.split(2));
-		input.addAll(w3.partitionBy(new KeySelector<Integer, Integer>() {
+		input.addAll(StreamWindow.split(w2, 2));
+		input.addAll(StreamWindow.partitionBy(w3, new KeySelector<Integer, Integer>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
 			public Integer getKey(Integer value) throws Exception {
 				return value % 2;
 			}
-		}));
+		}, false));
 		input.add(w4_2);
 
 		List<StreamWindow<Integer>> output = MockContext.createAndExecute(windowMerger, input);

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
index da68211..9a2416c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitionerTest.java
@@ -42,12 +42,12 @@ public class WindowPartitionerTest {
 		StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
 
 		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.addAll(w1.split(2));
-		expected1.addAll(w2.split(2));
+		expected1.addAll(StreamWindow.split(w1,2));
+		expected1.addAll(StreamWindow.split(w2,2));
 
 		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.addAll(w1.partitionBy(new MyKey()));
-		expected2.addAll(w2.partitionBy(new MyKey()));
+		expected2.addAll(StreamWindow.partitionBy(w1,new MyKey(),false));
+		expected2.addAll(StreamWindow.partitionBy(w2,new MyKey(),false));
 
 		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
 		input.add(w1);

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
index a07b681..c3efc7b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTest.java
@@ -92,7 +92,7 @@ public class StreamWindowTest {
 			// good
 		}
 
-		List<StreamWindow<Integer>> wList = merged.split(3);
+		List<StreamWindow<Integer>> wList = StreamWindow.split(merged,3);
 
 		StreamWindow<Integer> merged2 = StreamWindow.merge(wList);
 
@@ -133,12 +133,12 @@ public class StreamWindowTest {
 		streamWindow.add(5);
 		streamWindow.add(6);
 
-		List<StreamWindow<Integer>> split = streamWindow.split(2);
+		List<StreamWindow<Integer>> split = StreamWindow.split(streamWindow,2);
 		assertEquals(2, split.size());
 		assertEquals(StreamWindow.fromElements(1, 2, 3), split.get(0));
 		assertEquals(StreamWindow.fromElements(4, 5, 6), split.get(1));
 
-		List<StreamWindow<Integer>> split2 = streamWindow.split(6);
+		List<StreamWindow<Integer>> split2 = StreamWindow.split(streamWindow,6);
 		assertEquals(6, split2.size());
 		assertEquals(StreamWindow.fromElements(1), split2.get(0));
 		assertEquals(StreamWindow.fromElements(2), split2.get(1));
@@ -147,7 +147,7 @@ public class StreamWindowTest {
 		assertEquals(StreamWindow.fromElements(5), split2.get(4));
 		assertEquals(StreamWindow.fromElements(6), split2.get(5));
 
-		List<StreamWindow<Integer>> split3 = streamWindow.split(10);
+		List<StreamWindow<Integer>> split3 = StreamWindow.split(streamWindow,10);
 		assertEquals(6, split3.size());
 		assertEquals(StreamWindow.fromElements(1), split3.get(0));
 		assertEquals(StreamWindow.fromElements(2), split3.get(1));

http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index 95aace0..55abc72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -97,6 +97,32 @@ public class TumblingGroupedPreReducerTest {
 
 	}
 
+	@Test
+	public void testEmitWindow2() throws Exception {
+
+		List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
+		inputs.add(new Tuple2<Integer, Integer>(1, 1));
+		inputs.add(new Tuple2<Integer, Integer>(0, 0));
+		inputs.add(new Tuple2<Integer, Integer>(1, -1));
+		inputs.add(new Tuple2<Integer, Integer>(1, -2));
+
+		TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+		List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
+
+		WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
+				reducer, key, serializer).sequentialID();
+
+		wb.store(serializer.copy(inputs.get(0)));
+		wb.store(serializer.copy(inputs.get(1)));
+		wb.emitWindow(collector);
+
+		System.out.println(collected);
+
+		wb.store(serializer.copy(inputs.get(0)));
+		wb.store(serializer.copy(inputs.get(1)));
+		wb.store(serializer.copy(inputs.get(2)));
+	}
+
 	private static <T> void assertSetEquals(Collection<T> first, Collection<T> second) {
 		assertEquals(new HashSet<T>(first), new HashSet<T>(second));
 	}