You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 21:42:25 UTC
[2/7] incubator-flink git commit: [streaming] [examples] Refactor and
packaging for windowing examples
[streaming] [examples] Refactor and packaging for windowing examples
The current examples show-case the API, more meaningful examples are coming for the 0.9 release.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e34aca75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e34aca75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e34aca75
Branch: refs/heads/release-0.8
Commit: e34aca7545f4900725b470ff1ab2db4b48c2275f
Parents: a33ad5d
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 16 21:00:31 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:08 2014 +0100
----------------------------------------------------------------------
.../flink-streaming-examples/pom.xml | 96 ++++++++++-
.../examples/iteration/IterateExample.java | 12 +-
.../streaming/examples/join/WindowJoin.java | 168 +++++++++++++++++++
.../examples/window/join/WindowJoin.java | 165 ------------------
.../examples/windowing/DeltaExtractExample.java | 77 +++++++--
.../windowing/MultiplePoliciesExample.java | 104 ++++++++----
.../examples/windowing/SlidingExample.java | 99 +++++++----
.../windowing/TimeWindowingExample.java | 98 ++++++++---
8 files changed, 547 insertions(+), 272 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 1369828..d2d2b93 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -172,12 +172,12 @@ under the License.
<archive>
<manifestEntries>
- <program-class>org.apache.flink.streaming.examples.window.join.WindowJoin</program-class>
+ <program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
</manifestEntries>
</archive>
<includes>
- <include>org/apache/flink/streaming/examples/window/join/*.class</include>
+ <include>org/apache/flink/streaming/examples/join/*.class</include>
</includes>
</configuration>
</execution>
@@ -252,6 +252,98 @@ under the License.
</includes>
</configuration>
</execution>
+
+ <!-- DeltaExract -->
+ <execution>
+ <id>DeltaExract</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>DeltaExract</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include>
+ <include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- MultiplePolicies -->
+ <execution>
+ <id>MultiplePolicies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>MultiplePolicies</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include>
+ <include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- SlidingExample -->
+ <execution>
+ <id>SlidingExample</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>SlidingExample</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include>
+ <include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- TimeWindowing -->
+ <execution>
+ <id>TimeWindowing</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>TimeWindowing</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include>
+ <include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 54dbdb0..8fb42d6 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -21,9 +21,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
@@ -109,12 +110,12 @@ public class IterateExample {
* Iteration step function which takes an input (Double , Integer) and
* produces an output (Double + random, Integer + 1).
*/
- public static class Step implements
- MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
+ public static class Step extends
+ RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
private static final long serialVersionUID = 1L;
- private Random rnd;
+ private transient Random rnd;
- public Step() {
+ public void open(Configuration parameters) {
rnd = new Random();
}
@@ -122,7 +123,6 @@ public class IterateExample {
public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception {
return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1);
}
-
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
new file mode 100644
index 0000000..93df823
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ *
+ * <p>
+ * his example will join two streams with a sliding window. One which emits
+ * grades and one which emits salaries of people.
+ * </p>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>do windowed joins,
+ * <li>use tuple data types,
+ * <li>write a simple streaming program.
+ */
+public class WindowJoin {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // obtain execution environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // connect to the data sources for grades and salaries
+ DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+ DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
+
+ // apply a temporal join over the two stream based on the names over one
+ // second windows
+ DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+ .join(salaries)
+ .onWindow(1000)
+ .where(0)
+ .equalTo(0);
+
+ // emit result
+ if (fileOutput) {
+ joinedStream.writeAsText(outputPath, 1);
+ } else {
+ joinedStream.print();
+ }
+
+ // execute program
+ env.execute("Windowed Join Example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
+ private final static int GRADE_COUNT = 5;
+ private final static int SALARY_MAX = 10000;
+ private final static int SLEEP_TIME = 10;
+
+ /**
+ * Continuously emit tuples with random names and integers (grades).
+ */
+ public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private Random rand;
+ private Tuple2<String, Integer> outTuple;
+
+ public GradeSource() {
+ rand = new Random();
+ outTuple = new Tuple2<String, Integer>();
+ }
+
+ @Override
+ public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+ while (true) {
+ outTuple.f0 = names[rand.nextInt(names.length)];
+ outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+ out.collect(outTuple);
+ Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+ }
+ }
+ }
+
+ /**
+ * Continuously emit tuples with random names and integers (salaries).
+ */
+ public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private transient Random rand;
+ private transient Tuple2<String, Integer> outTuple;
+
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ rand = new Random();
+ outTuple = new Tuple2<String, Integer>();
+ }
+
+ @Override
+ public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+ while (true) {
+ outTuple.f0 = names[rand.nextInt(names.length)];
+ outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+ out.collect(outTuple);
+ Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 1) {
+ outputPath = args[0];
+ } else {
+ System.err.println("Usage: WindowJoin <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing WindowJoin with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: WindowJoin <result path>");
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
deleted file mode 100644
index d5f921e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Example illustrating join over sliding windows of streams in Flink.
- *
- * <p>
- * his example will join two streams with a sliding window. One which emits
- * grades and one which emits salaries of people.
- * </p>
- *
- * <p>
- * This example shows how to:
- * <ul>
- * <li>do windowed joins,
- * <li>use tuple data types,
- * <li>write a simple streaming program.
- */
-public class WindowJoin {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- // obtain execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // connect to the data sources for grades and salaries
- DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
- DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
-
- // apply a temporal join over the two stream based on the names over one
- // second windows
- DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
- .join(salaries)
- .onWindow(1000)
- .where(0)
- .equalTo(0);
-
- // emit result
- if (fileOutput) {
- joinedStream.writeAsText(outputPath, 1);
- } else {
- joinedStream.print();
- }
-
- // execute program
- env.execute("Windowed Join Example");
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
- private final static int GRADE_COUNT = 5;
- private final static int SALARY_MAX = 10000;
- private final static int SLEEP_TIME = 10;
-
- /**
- * Continuously emit tuples with random names and integers (grades).
- */
- public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private Random rand;
- private Tuple2<String, Integer> outTuple;
-
- public GradeSource() {
- rand = new Random();
- outTuple = new Tuple2<String, Integer>();
- }
-
- @Override
- public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
- while (true) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
- out.collect(outTuple);
- Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
- }
- }
- }
-
- /**
- * Continuously emit tuples with random names and integers (salaries).
- */
- public static class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private Random rand;
- private Tuple2<String, Integer> outTuple;
-
- public SalarySource() {
- rand = new Random();
- outTuple = new Tuple2<String, Integer>();
- }
-
- @Override
- public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
- while (true) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
- out.collect(outTuple);
- Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
- }
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
- if (args.length == 1) {
- outputPath = args[0];
- } else {
- System.err.println("Usage: WindowJoin <result path>");
- return false;
- }
- } else {
- System.out.println("Executing WindowJoin with generated data.");
- System.out.println(" Provide parameter to write to file.");
- System.out.println(" Usage: WindowJoin <result path>");
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
index 0622dbf..1013e6f 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
@@ -34,32 +34,44 @@ import org.apache.flink.util.Collector;
*/
public class DeltaExtractExample {
- private static final int PARALLELISM = 1;
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
- @SuppressWarnings({ "serial", "rawtypes", "unchecked" })
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
- ReduceFunction<Tuple3<Double, Double, String>> concatStrings = new ReduceFunction<Tuple3<Double, Double, String>>() {
- @Override
- public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception {
- return new Tuple3(value1.f0, value2.f1, value1.f2 + "|" + value2.f2);
- }
- };
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
DataStream dstream = env
.addSource(new CountingSource())
.window(Delta.of(new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(0d,
- 0d, "foo"), 1.2)).every(Count.of(2)).reduce(concatStrings);
+ 0d, "foo"), 1.2))
+ .every(Count.of(2))
+ .reduce(new ConcatStrings());
+
+ // emit result
+ if (fileOutput) {
+ dstream.writeAsText(outputPath, 1);
+ } else {
+ dstream.print();
+ }
- dstream.print();
- env.execute();
+ // execute the program
+ env.execute("Delta Extract Example");
}
- @SuppressWarnings("serial")
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
private static class CountingSource implements SourceFunction<Tuple3<Double, Double, String>> {
+ private static final long serialVersionUID = 1L;
private int counter = 0;
@@ -75,4 +87,41 @@ public class DeltaExtractExample {
}
}
+ private static final class ConcatStrings implements
+ ReduceFunction<Tuple3<Double, Double, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple3<Double, Double, String> reduce(Tuple3<Double, Double, String> value1,
+ Tuple3<Double, Double, String> value2) throws Exception {
+ return new Tuple3<Double, Double, String>(value1.f0, value2.f1, value1.f2 + "|"
+ + value2.f2);
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 1) {
+ outputPath = args[0];
+ } else {
+ System.err.println("Usage: DeltaExtractExample <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing DeltaExtractExample with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: DeltaExtractExample <result path>");
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
index 9b242f6..6f031c3 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
@@ -30,57 +30,103 @@ import org.apache.flink.util.Collector;
*/
public class MultiplePoliciesExample {
- private static final int PARALLELISM = 2;
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
-
- // This reduce function does a String concat.
- GroupReduceFunction<String, String> reducer = new GroupReduceFunction<String, String>() {
-
- /**
- * Auto generates version ID
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
- String output = "|";
- for (String v : values) {
- output = output + v + "|";
- }
- out.collect(output);
- }
- };
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(new BasicSource())
.groupBy(0)
.window(Count.of(2))
.every(Count.of(3), Count.of(5))
- .reduceGroup(reducer);
+ .reduceGroup(new Concat());
- stream.print();
+ // emit result
+ if (fileOutput) {
+ stream.writeAsText(outputPath, 1);
+ } else {
+ stream.print();
+ }
- env.execute();
+ // execute the program
+ env.execute("Multiple Policies Example");
}
- public static class BasicSource implements SourceFunction<String> {
+ /**
+ * This source function indefinitely provides String inputs for the
+ * topology.
+ */
+ public static final class BasicSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
- String str1 = new String("streaming");
- String str2 = new String("flink");
+ private final static String STR_1 = new String("streaming");
+ private final static String STR_2 = new String("flink");
@Override
public void invoke(Collector<String> out) throws Exception {
// continuous emit
while (true) {
- out.collect(str1);
- out.collect(str2);
+ out.collect(STR_1);
+ out.collect(STR_2);
}
}
}
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * This reduce function does a String concat.
+ */
+ public static final class Concat implements GroupReduceFunction<String, String> {
+
+ /**
+ * Auto generates version ID
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
+ String output = "|";
+ for (String v : values) {
+ output = output + v + "|";
+ }
+ out.collect(output);
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 1) {
+ outputPath = args[0];
+ } else {
+ System.err.println("Usage: MultiplePoliciesExample <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing MultiplePoliciesExample with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: MultiplePoliciesExample <result path>");
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
index c9c78b5..cf03477 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
@@ -31,55 +31,58 @@ import org.apache.flink.util.Collector;
*/
public class SlidingExample {
- private static final int PARALLELISM = 1;
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
* SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the
* buffer Resulting windows will have an overlap of 5 elements
*/
-
+
// DataStream<String> stream = env.addSource(new CountingSource())
// .window(Count.of(10))
// .every(Count.of(5))
- // .reduce(reduceFunction);
-
+ // .reduce(new Concat());
+
/*
* ADVANCED-EXAMPLE: Use this to have the last element of the last
* window as first element of the next window while the window size is
* always 5
*/
-
- // This reduce function does a String concat.
- ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
-
- /**
- * default version ID
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public String reduce(String value1, String value2) throws Exception {
- return value1 + "|" + value2;
- }
-
- };
DataStream<String> stream = env.addSource(new CountingSource())
- .window(Count.of(5).withDelete(4))
- .every(Count.of(4).startingAt(-1))
- .reduce(reduceFunction);
-
- stream.print();
+ .window(Count.of(5)
+ .withDelete(4))
+ .every(Count.of(4)
+ .startingAt(-1))
+ .reduce(new Concat());
+
+ // emit result
+ if (fileOutput) {
+ stream.writeAsText(outputPath, 1);
+ } else {
+ stream.print();
+ }
- env.execute();
+ // execute the program
+ env.execute("Sliding Example");
}
- @SuppressWarnings("serial")
- private static class CountingSource implements SourceFunction<String> {
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ private static final class CountingSource implements SourceFunction<String> {
+ private static final long serialVersionUID = 1L;
private int counter = 0;
@@ -93,6 +96,44 @@ public class SlidingExample {
collector.collect("V" + counter++);
}
}
+ }
+
+ /**
+ * This reduce function does a String concat.
+ */
+ private static final class Concat implements ReduceFunction<String> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public String reduce(String value1, String value2) throws Exception {
+ return value1 + "|" + value2;
+ }
}
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 1) {
+ outputPath = args[0];
+ } else {
+ System.err.println("Usage: SlidingExample <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing SlidingExample with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: SlidingExample <result path>");
+ }
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
index 8c26e4a..622aa82 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -21,9 +21,10 @@ import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
@@ -36,55 +37,59 @@ import org.apache.flink.util.Collector;
*/
public class TimeWindowingExample {
- private static final int PARALLELISM = 1;
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
- // Prevent output from being blocked
- env.setBufferTimeout(100);
-
- KeySelector<Integer, Integer> myKey = new KeySelector<Integer, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- if (value < 2) {
- return 0;
- } else {
- return 1;
- }
- }
+ if (!parseParameters(args)) {
+ return;
+ }
- };
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
.window(Count.of(100))
.every(Time.of(1000, TimeUnit.MILLISECONDS))
- .groupBy(myKey)
+ .groupBy(new MyKey())
.sum(0);
- stream.print();
+ // emit result
+ if (fileOutput) {
+ stream.writeAsText(outputPath, 1);
+ } else {
+ stream.print();
+ }
- env.execute();
+ // execute the program
+ env.execute("Time Windowing Example");
}
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
/**
* This data source emit one element every 0.001 sec. The output is an
* Integer counting the output elements. As soon as the counter reaches
* 10000 it is reset to 0. On each reset the source waits 5 sec. before it
* restarts to produce elements.
*/
- @SuppressWarnings("serial")
- private static class CountingSourceWithSleep implements SourceFunction<Integer> {
+ private static final class CountingSourceWithSleep extends RichSourceFunction<Integer> {
+ private static final long serialVersionUID = 1L;
private int counter = 0;
+ private transient Random rnd;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ rnd = new Random();
+ }
@Override
public void invoke(Collector<Integer> collector) throws Exception {
- Random rnd = new Random();
// continuous emit
while (true) {
if (counter > 9999) {
@@ -99,10 +104,49 @@ public class TimeWindowingExample {
// too fast for local tests and you might always see
// SUM[k=1..9999](k) as result.
Thread.sleep(1);
-
counter++;
}
}
+ }
+
+ private static final class MyKey implements KeySelector<Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ if (value < 2) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 1) {
+ outputPath = args[0];
+ } else {
+ System.err.println("Usage: TimeWindowingExample <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing TimeWindowingExample with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: TimeWindowingExample <result path>");
+ }
+ return true;
}
}