You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:18 UTC
[13/34] incubator-flink git commit: [streaming] New windowing API
merge and cleanup + several minor fixes
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
index 069693f..9b242f6 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
@@ -17,35 +17,27 @@
package org.apache.flink.streaming.examples.windowing;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.util.Collector;
-import java.util.LinkedList;
-
/**
* This example uses count based tumbling windowing with multiple eviction
* policies at the same time.
*/
public class MultiplePoliciesExample {
- private static final int PARALLELISM = 1;
+ private static final int PARALLELISM = 2;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
- LinkedList<TriggerPolicy<String>> policies = new LinkedList<TriggerPolicy<String>>();
- policies.add(new CountTriggerPolicy<String>(5));
- policies.add(new CountTriggerPolicy<String>(8));
-
// This reduce function does a String concat.
- ReduceFunction<String> reducer = new ReduceFunction<String>() {
+ GroupReduceFunction<String, String> reducer = new GroupReduceFunction<String, String>() {
/**
* Auto generates version ID
@@ -53,14 +45,21 @@ public class MultiplePoliciesExample {
private static final long serialVersionUID = 1L;
@Override
- public String reduce(String value1, String value2) throws Exception {
- return value1 + "|" + value2;
+ public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
+ String output = "|";
+ for (String v : values) {
+ output = output + v + "|";
+ }
+ out.collect(output);
}
};
- DataStream<Tuple2<String, String[]>> stream = env.addSource(new BasicSource()).window(
- policies, reducer);
+ DataStream<String> stream = env.addSource(new BasicSource())
+ .groupBy(0)
+ .window(Count.of(2))
+ .every(Count.of(3), Count.of(5))
+ .reduceGroup(reducer);
stream.print();
@@ -70,13 +69,16 @@ public class MultiplePoliciesExample {
public static class BasicSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
- String str = new String("streaming");
+
+ String str1 = new String("streaming");
+ String str2 = new String("flink");
@Override
public void invoke(Collector<String> out) throws Exception {
// continuous emit
while (true) {
- out.collect(str);
+ out.collect(str1);
+ out.collect(str2);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
index 1768480..c9c78b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
@@ -18,14 +18,10 @@
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.util.Collector;
/**
@@ -45,19 +41,18 @@ public class SlidingExample {
* SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the
* buffer Resulting windows will have an overlap of 5 elements
*/
- // TriggerPolicy<String> triggerPolicy=new
- // CountTriggerPolicy<String>(5);
- // EvictionPolicy<String> evictionPolicy=new
- // CountEvictionPolicy<String>(10);
-
+
+ // DataStream<String> stream = env.addSource(new CountingSource())
+ // .window(Count.of(10))
+ // .every(Count.of(5))
+ // .reduce(reduceFunction);
+
/*
* ADVANCED-EXAMPLE: Use this to have the last element of the last
* window as first element of the next window while the window size is
* always 5
*/
- TriggerPolicy<String> triggerPolicy = new CountTriggerPolicy<String>(4, -1);
- EvictionPolicy<String> evictionPolicy = new CountEvictionPolicy<String>(5, 4);
-
+
// This reduce function does a String concat.
ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
@@ -73,8 +68,10 @@ public class SlidingExample {
};
- DataStream<Tuple2<String, String[]>> stream = env.addSource(new CountingSource()).window(
- triggerPolicy, evictionPolicy, reduceFunction);
+ DataStream<String> stream = env.addSource(new CountingSource())
+ .window(Count.of(5).withDelete(4))
+ .every(Count.of(4).startingAt(-1))
+ .reduce(reduceFunction);
stream.print();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
index 115363b..fba73be 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -17,27 +17,26 @@
package org.apache.flink.streaming.examples.windowing;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.util.Collector;
/**
* This example shows the functionality of time based windows. It utilizes the
- * {@link ActiveTriggerPolicy} implementation in the {@link TimeTriggerPolicy}.
+ * {@link ActiveTriggerPolicy} implementation in the
+ * {@link ActiveTimeTriggerPolicy}.
*/
public class TimeWindowingExample {
- private static final int PARALLELISM = 1;
+ private static final int PARALLELISM = 2;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
@@ -46,38 +45,21 @@ public class TimeWindowingExample {
// Prevent output from being blocked
env.setBufferTimeout(100);
- // Trigger every 1000ms
- TriggerPolicy<Integer> triggerPolicy = new TimeTriggerPolicy<Integer>(1000L,
- new DefaultTimeStamp<Integer>(), new Extractor<Long, Integer>() {
+ DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
+ .groupBy(new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
- public Integer extract(Long in) {
- return in.intValue();
+ public Integer getKey(Integer value) throws Exception {
+ if (value < 3) {
+ return 0;
+ } else {
+ return 1;
+ }
}
- });
-
- // Always keep the newest 100 elements in the buffer
- EvictionPolicy<Integer> evictionPolicy = new CountEvictionPolicy<Integer>(100);
-
- // This reduce function does a String concat.
- ReduceFunction<Integer> reduceFunction = new ReduceFunction<Integer>() {
-
- /**
- * default version ID
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
-
- };
-
- DataStream<Tuple2<Integer, String[]>> stream = env.addSource(new CountingSourceWithSleep()).window(triggerPolicy, evictionPolicy, reduceFunction);
+ }).window(Count.of(100)).every(Time.of(1000, TimeUnit.MILLISECONDS)).sum();
stream.print();
@@ -97,6 +79,7 @@ public class TimeWindowingExample {
@Override
public void invoke(Collector<Integer> collector) throws Exception {
+ Random rnd = new Random();
// continuous emit
while (true) {
if (counter > 9999) {
@@ -105,7 +88,7 @@ public class TimeWindowingExample {
System.out.println("Source continouse with emitting now!");
counter = 0;
}
- collector.collect(counter);
+ collector.collect(rnd.nextInt(9) + 1);
// Wait 0.001 sec. before the next emit. Otherwise the source is
// too fast for local tests and you might always see