You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 21:42:25 UTC

[2/7] incubator-flink git commit: [streaming] [examples] Refactor and packaging for windowing examples

[streaming] [examples] Refactor and packaging for windowing examples

The current examples show-case the API, more meaningful examples are coming for the 0.9 release.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e34aca75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e34aca75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e34aca75

Branch: refs/heads/release-0.8
Commit: e34aca7545f4900725b470ff1ab2db4b48c2275f
Parents: a33ad5d
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 16 21:00:31 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:08 2014 +0100

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            |  96 ++++++++++-
 .../examples/iteration/IterateExample.java      |  12 +-
 .../streaming/examples/join/WindowJoin.java     | 168 +++++++++++++++++++
 .../examples/window/join/WindowJoin.java        | 165 ------------------
 .../examples/windowing/DeltaExtractExample.java |  77 +++++++--
 .../windowing/MultiplePoliciesExample.java      | 104 ++++++++----
 .../examples/windowing/SlidingExample.java      |  99 +++++++----
 .../windowing/TimeWindowingExample.java         |  98 ++++++++---
 8 files changed, 547 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 1369828..d2d2b93 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -172,12 +172,12 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.window.join.WindowJoin</program-class>
+									<program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/streaming/examples/window/join/*.class</include>			
+								<include>org/apache/flink/streaming/examples/join/*.class</include>			
 							</includes>
 						</configuration>
 					</execution>
@@ -252,6 +252,98 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
+
+					<!-- DeltaExract -->
+					<execution>
+						<id>DeltaExract</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>DeltaExract</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- MultiplePolicies -->
+					<execution>
+						<id>MultiplePolicies</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>MultiplePolicies</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- SlidingExample -->
+					<execution>
+						<id>SlidingExample</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SlidingExample</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- TimeWindowing -->
+					<execution>
+						<id>TimeWindowing</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>TimeWindowing</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 		</plugins>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 54dbdb0..8fb42d6 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -21,9 +21,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
@@ -109,12 +110,12 @@ public class IterateExample {
 	 * Iteration step function which takes an input (Double , Integer) and
 	 * produces an output (Double + random, Integer + 1).
 	 */
-	public static class Step implements
-			MapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
+	public static class Step extends
+			RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
 		private static final long serialVersionUID = 1L;
-		private Random rnd;
+		private transient Random rnd;
 
-		public Step() {
+		public void open(Configuration parameters) {
 			rnd = new Random();
 		}
 
@@ -122,7 +123,6 @@ public class IterateExample {
 		public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception {
 			return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1);
 		}
-
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
new file mode 100644
index 0000000..93df823
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ * 
+ * <p>
+ * his example will join two streams with a sliding window. One which emits
+ * grades and one which emits salaries of people.
+ * </p>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>do windowed joins,
+ * <li>use tuple data types,
+ * <li>write a simple streaming program.
+ */
+public class WindowJoin {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// connect to the data sources for grades and salaries
+		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
+
+		// apply a temporal join over the two stream based on the names over one
+		// second windows
+		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
+				.join(salaries)
+				.onWindow(1000)
+				.where(0)
+				.equalTo(0);
+
+		// emit result
+		if (fileOutput) {
+			joinedStream.writeAsText(outputPath, 1);
+		} else {
+			joinedStream.print();
+		}
+
+		// execute program
+		env.execute("Windowed Join Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
+	private final static int GRADE_COUNT = 5;
+	private final static int SALARY_MAX = 10000;
+	private final static int SLEEP_TIME = 10;
+
+	/**
+	 * Continuously emit tuples with random names and integers (grades).
+	 */
+	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple2<String, Integer> outTuple;
+
+		public GradeSource() {
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	/**
+	 * Continuously emit tuples with random names and integers (salaries).
+	 */
+	public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private transient Random rand;
+		private transient Tuple2<String, Integer> outTuple;
+
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: WindowJoin <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WindowJoin with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: WindowJoin <result path>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
deleted file mode 100644
index d5f921e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoin.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.window.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Example illustrating join over sliding windows of streams in Flink.
- * 
- * <p>
- * his example will join two streams with a sliding window. One which emits
- * grades and one which emits salaries of people.
- * </p>
- *
- * <p>
- * This example shows how to:
- * <ul>
- * <li>do windowed joins,
- * <li>use tuple data types,
- * <li>write a simple streaming program.
- */
-public class WindowJoin {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// obtain execution environment
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// connect to the data sources for grades and salaries
-		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
-		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
-
-		// apply a temporal join over the two stream based on the names over one
-		// second windows
-		DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
-				.join(salaries)
-				.onWindow(1000)
-				.where(0)
-				.equalTo(0);
-
-		// emit result
-		if (fileOutput) {
-			joinedStream.writeAsText(outputPath, 1);
-		} else {
-			joinedStream.print();
-		}
-
-		// execute program
-		env.execute("Windowed Join Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
-	private final static int GRADE_COUNT = 5;
-	private final static int SALARY_MAX = 10000;
-	private final static int SLEEP_TIME = 10;
-
-	/**
-	 * Continuously emit tuples with random names and integers (grades).
-	 */
-	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private Random rand;
-		private Tuple2<String, Integer> outTuple;
-
-		public GradeSource() {
-			rand = new Random();
-			outTuple = new Tuple2<String, Integer>();
-		}
-
-		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-			while (true) {
-				outTuple.f0 = names[rand.nextInt(names.length)];
-				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
-				out.collect(outTuple);
-				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-			}
-		}
-	}
-
-	/**
-	 * Continuously emit tuples with random names and integers (salaries).
-	 */
-	public static class SalarySource implements SourceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private Random rand;
-		private Tuple2<String, Integer> outTuple;
-
-		public SalarySource() {
-			rand = new Random();
-			outTuple = new Tuple2<String, Integer>();
-		}
-
-		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-			while (true) {
-				outTuple.f0 = names[rand.nextInt(names.length)];
-				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
-				out.collect(outTuple);
-				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-			}
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: WindowJoin <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WindowJoin with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: WindowJoin <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
index 0622dbf..1013e6f 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
@@ -34,32 +34,44 @@ import org.apache.flink.util.Collector;
  */
 public class DeltaExtractExample {
 
-	private static final int PARALLELISM = 1;
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
 
-	@SuppressWarnings({ "serial", "rawtypes", "unchecked" })
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
 
-		ReduceFunction<Tuple3<Double, Double, String>> concatStrings = new ReduceFunction<Tuple3<Double, Double, String>>() {
-			@Override
-			public Tuple3 reduce(Tuple3 value1, Tuple3 value2) throws Exception {
-				return new Tuple3(value1.f0, value2.f1, value1.f2 + "|" + value2.f2);
-			}
-		};
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
+		@SuppressWarnings({ "unchecked", "rawtypes" })
 		DataStream dstream = env
 				.addSource(new CountingSource())
 				.window(Delta.of(new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(0d,
-						0d, "foo"), 1.2)).every(Count.of(2)).reduce(concatStrings);
+						0d, "foo"), 1.2))
+				.every(Count.of(2))
+				.reduce(new ConcatStrings());
+
+		// emit result
+		if (fileOutput) {
+			dstream.writeAsText(outputPath, 1);
+		} else {
+			dstream.print();
+		}
 
-		dstream.print();
-		env.execute();
+		// execute the program
+		env.execute("Delta Extract Example");
 
 	}
 
-	@SuppressWarnings("serial")
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
 	private static class CountingSource implements SourceFunction<Tuple3<Double, Double, String>> {
+		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
 
@@ -75,4 +87,41 @@ public class DeltaExtractExample {
 		}
 	}
 
+	private static final class ConcatStrings implements
+			ReduceFunction<Tuple3<Double, Double, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple3<Double, Double, String> reduce(Tuple3<Double, Double, String> value1,
+				Tuple3<Double, Double, String> value2) throws Exception {
+			return new Tuple3<Double, Double, String>(value1.f0, value2.f1, value1.f2 + "|"
+					+ value2.f2);
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: DeltaExtractExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing DeltaExtractExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: DeltaExtractExample <result path>");
+		}
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
index 9b242f6..6f031c3 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
@@ -30,57 +30,103 @@ import org.apache.flink.util.Collector;
  */
 public class MultiplePoliciesExample {
 
-	private static final int PARALLELISM = 2;
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		// This reduce function does a String concat.
-		GroupReduceFunction<String, String> reducer = new GroupReduceFunction<String, String>() {
-
-			/**
-			 * Auto generates version ID
-			 */
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
-				String output = "|";
-				for (String v : values) {
-					output = output + v + "|";
-				}
-				out.collect(output);
-			}
 
-		};
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<String> stream = env.addSource(new BasicSource())
 				.groupBy(0)
 				.window(Count.of(2))
 				.every(Count.of(3), Count.of(5))
-				.reduceGroup(reducer);
+				.reduceGroup(new Concat());
 
-		stream.print();
+		// emit result
+		if (fileOutput) {
+			stream.writeAsText(outputPath, 1);
+		} else {
+			stream.print();
+		}
 
-		env.execute();
+		// execute the program
+		env.execute("Multiple Policies Example");
 	}
 
-	public static class BasicSource implements SourceFunction<String> {
+	/**
+	 * This source function indefinitely provides String inputs for the
+	 * topology.
+	 */
+	public static final class BasicSource implements SourceFunction<String> {
 
 		private static final long serialVersionUID = 1L;
 
-		String str1 = new String("streaming");
-		String str2 = new String("flink");
+		private final static String STR_1 = new String("streaming");
+		private final static String STR_2 = new String("flink");
 
 		@Override
 		public void invoke(Collector<String> out) throws Exception {
 			// continuous emit
 			while (true) {
-				out.collect(str1);
-				out.collect(str2);
+				out.collect(STR_1);
+				out.collect(STR_2);
 			}
 		}
 	}
 
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * This reduce function does a String concat.
+	 */
+	public static final class Concat implements GroupReduceFunction<String, String> {
+
+		/**
+		 * Auto generates version ID
+		 */
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
+			String output = "|";
+			for (String v : values) {
+				output = output + v + "|";
+			}
+			out.collect(output);
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: MultiplePoliciesExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing MultiplePoliciesExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: MultiplePoliciesExample <result path>");
+		}
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
index c9c78b5..cf03477 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
@@ -31,55 +31,58 @@ import org.apache.flink.util.Collector;
  */
 public class SlidingExample {
 
-	private static final int PARALLELISM = 1;
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		/*
 		 * SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the
 		 * buffer Resulting windows will have an overlap of 5 elements
 		 */
-		
+
 		// DataStream<String> stream = env.addSource(new CountingSource())
 		// .window(Count.of(10))
 		// .every(Count.of(5))
-		// .reduce(reduceFunction);
-		
+		// .reduce(new Concat());
+
 		/*
 		 * ADVANCED-EXAMPLE: Use this to have the last element of the last
 		 * window as first element of the next window while the window size is
 		 * always 5
 		 */
-		
-		// This reduce function does a String concat.
-		ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
-
-			/**
-			 * default version ID
-			 */
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String reduce(String value1, String value2) throws Exception {
-				return value1 + "|" + value2;
-			}
-
-		};
 
 		DataStream<String> stream = env.addSource(new CountingSource())
-				.window(Count.of(5).withDelete(4))
-				.every(Count.of(4).startingAt(-1))
-				.reduce(reduceFunction);
-
-		stream.print();
+				.window(Count.of(5)
+				.withDelete(4))
+				.every(Count.of(4)
+				.startingAt(-1))
+				.reduce(new Concat());
+
+		// emit result
+		if (fileOutput) {
+			stream.writeAsText(outputPath, 1);
+		} else {
+			stream.print();
+		}
 
-		env.execute();
+		// execute the program
+		env.execute("Sliding Example");
 	}
 
-	@SuppressWarnings("serial")
-	private static class CountingSource implements SourceFunction<String> {
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private static final class CountingSource implements SourceFunction<String> {
+		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
 
@@ -93,6 +96,44 @@ public class SlidingExample {
 				collector.collect("V" + counter++);
 			}
 		}
+	}
+
+	/**
+	 * This reduce function does a String concat.
+	 */
+	private static final class Concat implements ReduceFunction<String> {
+		private static final long serialVersionUID = 1L;
 
+		@Override
+		public String reduce(String value1, String value2) throws Exception {
+			return value1 + "|" + value2;
+		}
 	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: SlidingExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing SlidingExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: SlidingExample <result path>");
+		}
+		return true;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e34aca75/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
index 8c26e4a..622aa82 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -21,9 +21,10 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
@@ -36,55 +37,59 @@ import org.apache.flink.util.Collector;
  */
 public class TimeWindowingExample {
 
-	private static final int PARALLELISM = 1;
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
 
-		// Prevent output from being blocked
-		env.setBufferTimeout(100);
-
-		KeySelector<Integer, Integer> myKey = new KeySelector<Integer, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				if (value < 2) {
-					return 0;
-				} else {
-					return 1;
-				}
-			}
+		if (!parseParameters(args)) {
+			return;
+		}
 
-		};
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
 				.window(Count.of(100))
 				.every(Time.of(1000, TimeUnit.MILLISECONDS))
-				.groupBy(myKey)
+				.groupBy(new MyKey())
 				.sum(0);
 
-		stream.print();
+		// emit result
+		if (fileOutput) {
+			stream.writeAsText(outputPath, 1);
+		} else {
+			stream.print();
+		}
 
-		env.execute();
+		// execute the program
+		env.execute("Time Windowing Example");
 	}
 
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
 	/**
 	 * This data source emit one element every 0.001 sec. The output is an
 	 * Integer counting the output elements. As soon as the counter reaches
 	 * 10000 it is reset to 0. On each reset the source waits 5 sec. before it
 	 * restarts to produce elements.
 	 */
-	@SuppressWarnings("serial")
-	private static class CountingSourceWithSleep implements SourceFunction<Integer> {
+	private static final class CountingSourceWithSleep extends RichSourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
+		private transient Random rnd;
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			rnd = new Random();
+		}
 
 		@Override
 		public void invoke(Collector<Integer> collector) throws Exception {
-			Random rnd = new Random();
 			// continuous emit
 			while (true) {
 				if (counter > 9999) {
@@ -99,10 +104,49 @@ public class TimeWindowingExample {
 				// too fast for local tests and you might always see
 				// SUM[k=1..9999](k) as result.
 				Thread.sleep(1);
-
 				counter++;
 			}
 		}
+	}
+
+	private static final class MyKey implements KeySelector<Integer, Integer> {
 
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer getKey(Integer value) throws Exception {
+			if (value < 2) {
+				return 0;
+			} else {
+				return 1;
+			}
+		}
+
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: TimeWindowingExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TimeWindowingExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: TimeWindowingExample <result path>");
+		}
+		return true;
 	}
 }