You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:18 UTC

[13/34] incubator-flink git commit: [streaming] New windowing API merge and cleanup + several minor fixes

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
index 069693f..9b242f6 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
@@ -17,35 +17,27 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.util.Collector;
 
-import java.util.LinkedList;
-
 /**
  * This example uses count based tumbling windowing with multiple eviction
  * policies at the same time.
  */
 public class MultiplePoliciesExample {
 
-	private static final int PARALLELISM = 1;
+	private static final int PARALLELISM = 2;
 
 	public static void main(String[] args) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(PARALLELISM);
 
-		LinkedList<TriggerPolicy<String>> policies = new LinkedList<TriggerPolicy<String>>();
-		policies.add(new CountTriggerPolicy<String>(5));
-		policies.add(new CountTriggerPolicy<String>(8));
-
 		// This reduce function does a String concat.
-		ReduceFunction<String> reducer = new ReduceFunction<String>() {
+		GroupReduceFunction<String, String> reducer = new GroupReduceFunction<String, String>() {
 
 			/**
 			 * Auto generates version ID
@@ -53,14 +45,21 @@ public class MultiplePoliciesExample {
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public String reduce(String value1, String value2) throws Exception {
-				return value1 + "|" + value2;
+			public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
+				String output = "|";
+				for (String v : values) {
+					output = output + v + "|";
+				}
+				out.collect(output);
 			}
 
 		};
 
-		DataStream<Tuple2<String, String[]>> stream = env.addSource(new BasicSource()).window(
-				policies, reducer);
+		DataStream<String> stream = env.addSource(new BasicSource())
+				.groupBy(0)
+				.window(Count.of(2))
+				.every(Count.of(3), Count.of(5))
+				.reduceGroup(reducer);
 
 		stream.print();
 
@@ -70,13 +69,16 @@ public class MultiplePoliciesExample {
 	public static class BasicSource implements SourceFunction<String> {
 
 		private static final long serialVersionUID = 1L;
-		String str = new String("streaming");
+
+		String str1 = new String("streaming");
+		String str2 = new String("flink");
 
 		@Override
 		public void invoke(Collector<String> out) throws Exception {
 			// continuous emit
 			while (true) {
-				out.collect(str);
+				out.collect(str1);
+				out.collect(str2);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
index 1768480..c9c78b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
@@ -18,14 +18,10 @@
 package org.apache.flink.streaming.examples.windowing;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.util.Collector;
 
 /**
@@ -45,19 +41,18 @@ public class SlidingExample {
 		 * SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the
 		 * buffer Resulting windows will have an overlap of 5 elements
 		 */
-		// TriggerPolicy<String> triggerPolicy=new
-		// CountTriggerPolicy<String>(5);
-		// EvictionPolicy<String> evictionPolicy=new
-		// CountEvictionPolicy<String>(10);
-
+		
+		// DataStream<String> stream = env.addSource(new CountingSource())
+		// .window(Count.of(10))
+		// .every(Count.of(5))
+		// .reduce(reduceFunction);
+		
 		/*
 		 * ADVANCED-EXAMPLE: Use this to have the last element of the last
 		 * window as first element of the next window while the window size is
 		 * always 5
 		 */
-		TriggerPolicy<String> triggerPolicy = new CountTriggerPolicy<String>(4, -1);
-		EvictionPolicy<String> evictionPolicy = new CountEvictionPolicy<String>(5, 4);
-
+		
 		// This reduce function does a String concat.
 		ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
 
@@ -73,8 +68,10 @@ public class SlidingExample {
 
 		};
 
-		DataStream<Tuple2<String, String[]>> stream = env.addSource(new CountingSource()).window(
-				triggerPolicy, evictionPolicy, reduceFunction);
+		DataStream<String> stream = env.addSource(new CountingSource())
+				.window(Count.of(5).withDelete(4))
+				.every(Count.of(4).startingAt(-1))
+				.reduce(reduceFunction);
 
 		stream.print();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
index 115363b..fba73be 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -17,27 +17,26 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.util.Collector;
 
 /**
  * This example shows the functionality of time based windows. It utilizes the
- * {@link ActiveTriggerPolicy} implementation in the {@link TimeTriggerPolicy}.
+ * {@link ActiveTriggerPolicy} implementation in the
+ * {@link ActiveTimeTriggerPolicy}.
  */
 public class TimeWindowingExample {
 
-	private static final int PARALLELISM = 1;
+	private static final int PARALLELISM = 2;
 
 	public static void main(String[] args) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
@@ -46,38 +45,21 @@ public class TimeWindowingExample {
 		// Prevent output from being blocked
 		env.setBufferTimeout(100);
 
-		// Trigger every 1000ms
-		TriggerPolicy<Integer> triggerPolicy = new TimeTriggerPolicy<Integer>(1000L,
-				new DefaultTimeStamp<Integer>(), new Extractor<Long, Integer>() {
+		DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
+				.groupBy(new KeySelector<Integer, Integer>() {
 
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public Integer extract(Long in) {
-						return in.intValue();
+					public Integer getKey(Integer value) throws Exception {
+						if (value < 3) {
+							return 0;
+						} else {
+							return 1;
+						}
 					}
 
-				});
-
-		// Always keep the newest 100 elements in the buffer
-		EvictionPolicy<Integer> evictionPolicy = new CountEvictionPolicy<Integer>(100);
-
-		// This reduce function does a String concat.
-		ReduceFunction<Integer> reduceFunction = new ReduceFunction<Integer>() {
-
-			/**
-			 * default version ID
-			 */
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer reduce(Integer value1, Integer value2) throws Exception {
-				return value1 + value2;
-			}
-
-		};
-
-		DataStream<Tuple2<Integer, String[]>> stream = env.addSource(new CountingSourceWithSleep()).window(triggerPolicy, evictionPolicy, reduceFunction);
+				}).window(Count.of(100)).every(Time.of(1000, TimeUnit.MILLISECONDS)).sum();
 
 		stream.print();
 
@@ -97,6 +79,7 @@ public class TimeWindowingExample {
 
 		@Override
 		public void invoke(Collector<Integer> collector) throws Exception {
+			Random rnd = new Random();
 			// continuous emit
 			while (true) {
 				if (counter > 9999) {
@@ -105,7 +88,7 @@ public class TimeWindowingExample {
 					System.out.println("Source continouse with emitting now!");
 					counter = 0;
 				}
-				collector.collect(counter);
+				collector.collect(rnd.nextInt(9) + 1);
 
 				// Wait 0.001 sec. before the next emit. Otherwise the source is
 				// too fast for local tests and you might always see